Merge pull request 'backend/new-overpass' (#52) from backend/new-overpass into main

Reviewed-on: #52
This commit is contained in:
kscheidecker 2025-01-23 15:34:21 +00:00
commit 8d9e2d9207
43 changed files with 3242 additions and 198482 deletions

View File

@ -28,5 +28,5 @@ jobs:
working-directory: backend
- name: Run linter
run: pipenv run pylint src --fail-under=9
run: pipenv run pylint src --fail-under=9
working-directory: backend

View File

@ -28,7 +28,7 @@ jobs:
working-directory: backend
- name: Run Tests
run: pipenv run pytest src --html=report.html --self-contained-html
run: pipenv run pytest src --html=report.html --self-contained-html --log-cli-level=INFO
working-directory: backend
- name: Upload HTML report

View File

@ -24,8 +24,6 @@ Refer to the READMEs in the `frontend` and `backend` directories for instruction
- `google_maps_flutter` plugin
- Python 3
- `fastapi`
- `OSMPythonTools`
- `numpy, scipy`
- Docker

8
backend/.gitignore vendored
View File

@ -1,9 +1,5 @@
# osm-cache and wikidata cache
cache/
apicache/
# wikidata throttle
*.ctrl
# osm-cache
cache_XML/
# Byte-compiled / optimized / DLL files
__pycache__/

View File

@ -293,7 +293,7 @@ ignored-parents=
max-args=5
# Maximum number of attributes for a class (see R0902).
max-attributes=7
max-attributes=20
# Maximum number of boolean expressions in an if statement (see R0916).
max-bool-expr=5
@ -302,7 +302,7 @@ max-bool-expr=5
max-branches=12
# Maximum number of locals for function / method body.
max-locals=15
max-locals=30
# Maximum number of parents for a class (see R0901).
max-parents=7
@ -402,7 +402,7 @@ preferred-modules=
# The type of string formatting that logging methods do. `old` means using %
# formatting, `new` is for `{}` formatting.
logging-format-style=old
logging-format-style=new
# Logging modules to check that the string format arguments are in logging
# function parameter format.
@ -440,7 +440,12 @@ disable=raw-checker-failed,
use-implicit-booleaness-not-comparison-to-string,
use-implicit-booleaness-not-comparison-to-zero,
import-error,
line-too-long
multiple-statements,
line-too-long,
logging-fstring-interpolation,
duplicate-code,
relative-beyond-top-level,
invalid-name
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option

View File

@ -18,11 +18,10 @@ numpy = "*"
fastapi = "*"
pydantic = "*"
shapely = "*"
scipy = "*"
osmpythontools = "*"
pywikibot = "*"
pymemcache = "*"
fastapi-cli = "*"
scikit-learn = "*"
pyqt6 = "*"
loki-logger-handler = "*"
pulp = "*"
scipy = "*"

1835
backend/Pipfile.lock generated

File diff suppressed because it is too large Load Diff

1094
backend/report.html Normal file

File diff suppressed because one or more lines are too long

View File

@ -70,6 +70,6 @@ else:
MEMCACHED_HOST_PATH,
timeout=1,
allow_unicode_keys=True,
encoding='utf-8',
encoding='utf-8',
serde=serde.pickle_serde
)

View File

@ -19,10 +19,9 @@ def configure_logging():
# in that case we want to log to stdout and also to loki
from loki_logger_handler.loki_logger_handler import LokiLoggerHandler
loki_url = os.getenv('LOKI_URL')
loki_url = "http://localhost:3100/loki/api/v1/push"
if loki_url is None:
raise ValueError("LOKI_URL environment variable is not set")
loki_handler = LokiLoggerHandler(
url = loki_url,
labels = {'app': 'anyway', 'environment': 'staging' if is_debug else 'production'}
@ -55,4 +54,3 @@ def configure_logging():
logging.getLogger('uvicorn').handlers = logging_handlers
logging.getLogger('uvicorn.access').handlers = logging_handlers
logging.getLogger('uvicorn.error').handlers = logging_handlers

View File

@ -1,8 +1,9 @@
"""Main app for backend api"""
import logging
from fastapi import FastAPI, HTTPException, Query
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Query
from .logging_config import configure_logging
from .structs.landmark import Landmark, Toilets
@ -11,8 +12,8 @@ from .structs.linked_landmarks import LinkedLandmarks
from .structs.trip import Trip
from .utils.landmarks_manager import LandmarkManager
from .utils.toilets_manager import ToiletsManager
from .utils.optimizer import Optimizer
from .utils.refiner import Refiner
from .optimization.optimizer import Optimizer
from .optimization.refiner import Refiner
from .cache import client as cache_client
logger = logging.getLogger(__name__)
@ -81,6 +82,7 @@ def new_trip(preferences: Preferences,
must_do=True,
n_tags=0)
start_time = time.time()
# Generate the landmarks from the start location
landmarks, landmarks_short = manager.generate_landmarks_list(
center_coordinates = start,
@ -91,22 +93,38 @@ def new_trip(preferences: Preferences,
landmarks_short.insert(0, start_landmark)
landmarks_short.append(end_landmark)
t_generate_landmarks = time.time() - start_time
logger.info(f'Fetched {len(landmarks)} landmarks in \t: {round(t_generate_landmarks,3)} seconds')
start_time = time.time()
# First stage optimization
try:
base_tour = optimizer.solve_optimization(preferences.max_time_minute, landmarks_short)
except ArithmeticError as exc:
raise HTTPException(status_code=500, detail="No solution found") from exc
except TimeoutError as exc:
raise HTTPException(status_code=500, detail="Optimzation took too long") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Optimization failed: {str(exc)}") from exc
t_first_stage = time.time() - start_time
start_time = time.time()
# Second stage optimization
refined_tour = refiner.refine_optimization(landmarks, base_tour,
# TODO : only if necessary (not enough landmarks for ex.)
try :
refined_tour = refiner.refine_optimization(landmarks, base_tour,
preferences.max_time_minute,
preferences.detour_tolerance_minute)
except Exception as exc :
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(exc)}") from exc
t_second_stage = time.time() - start_time
logger.debug(f'First stage optimization\t: {round(t_first_stage,3)} seconds')
logger.debug(f'Second stage optimization\t: {round(t_second_stage,3)} seconds')
logger.info(f'Total computation time\t: {round(t_first_stage + t_second_stage,3)} seconds')
linked_tour = LinkedLandmarks(refined_tour)
# upon creation of the trip, persistence of both the trip and its landmarks is ensured.
trip = Trip.from_linked_landmarks(linked_tour, cache_client)
logger.info(f'Generated a trip of {trip.total_time} minutes with {len(refined_tour)} landmarks in {round(t_generate_landmarks + t_first_stage + t_second_stage,3)} seconds.')
return trip
@ -165,7 +183,7 @@ def get_toilets(location: tuple[float, float] = Query(...), radius: int = 500) -
raise HTTPException(status_code=406, detail="Coordinates not provided or invalid")
if not (-90 <= location[0] <= 90 or -180 <= location[1] <= 180):
raise HTTPException(status_code=422, detail="Start coordinates not in range")
toilets_manager = ToiletsManager(location, radius)
try :

View File

View File

@ -0,0 +1,606 @@
"""Module responsible for sloving an MILP to find best tour around the given landmarks."""
import logging
from collections import defaultdict, deque
import yaml
import numpy as np
import pulp as pl
from ..structs.landmark import Landmark
from ..utils.get_time_distance import get_time
from ..constants import OPTIMIZER_PARAMETERS_PATH
# Silence the pupl logger
logging.getLogger('pulp').setLevel(level=logging.CRITICAL)
class Optimizer:
"""
Optimizes the balance between the efficiency of a tour and the inclusion of landmarks.
The `Optimizer` class is responsible for calculating the best possible detour adjustments
to a tour based on specific parameters such as detour time, walking speed, and the maximum
number of landmarks to visit. It helps refine a tour by determining whether adding additional
landmarks would significantly reduce the overall efficiency.
Responsibilities:
- Calculates the maximum detour time allowed for a given tour.
- Considers the detour factor, which accounts for real-world walking paths versus straight-line distance.
- Takes into account the average walking speed to estimate walking times.
- Limits the number of landmarks that can be added to the tour to prevent excessive detouring.
- Allows some overflow (overshoot) in the maximum detour time to accommodate for slight inefficiencies.
Attributes:
logger (logging.Logger): Logger for capturing relevant events and errors.
detour (int): The accepted maximum detour time in minutes.
detour_factor (float): The ratio between straight-line distance and actual walking distance in cities.
average_walking_speed (float): The average walking speed of an adult (in meters per second or kilometers per hour).
max_landmarks (int): The maximum number of landmarks to include in the tour.
overshoot (float): The overshoot allowance for exceeding the maximum detour time in a restrictive manner.
"""
logger = logging.getLogger(__name__)
detour: int = None # accepted max detour time (in minutes)
detour_factor: float # detour factor of straight line vs real distance in cities
average_walking_speed: float # average walking speed of adult
max_landmarks: int # max number of landmarks to visit
overshoot: float # overshoot to allow maxtime to overflow. Optimizer is a bit restrictive
def __init__(self) :
# load parameters from file
with OPTIMIZER_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
self.detour_factor = parameters['detour_factor']
self.average_walking_speed = parameters['average_walking_speed']
self.max_landmarks = parameters['max_landmarks']
self.overshoot = parameters['overshoot']
def init_ub_time(self, prob: pl.LpProblem, x: pl.LpVariable, L: int, landmarks: list[Landmark], max_time: int):
"""
Initialize the objective function and inequality constraints for the linear program.
This function sets up the objective to maximize the attractiveness of visiting landmarks,
while ensuring that the total time (including travel and visit duration) does not exceed
the maximum allowed time. It calculates the pairwise travel times between landmarks and
incorporates visit duration to form the inequality constraints.
The objective is to maximize sightseeing by selecting the most attractive landmarks within
the time limit.
Args:
prob (pl.LpProblem): The linear programming problem where constraints and the objective will be added.
x (pl.LpVariable): A decision variable representing whether a landmark is visited.
L (int): The number of landmarks.
landmarks (list[Landmark]): List of landmarks to visit.
max_time (int): Maximum allowable time for sightseeing, including travel and visit duration.
Returns:
None: Adds the objective function and constraints to the LP problem directly.
constraint coefficients, and the right-hand side of the inequality constraint.
"""
L = len(landmarks)
# Objective function coefficients. a*x1 + b*x2 + c*x3 + ...
c = np.zeros(L, dtype=np.int16)
# inequality matrix and vector
A_ub = np.zeros(L*L, dtype=np.int16)
b_ub = round(max_time*self.overshoot)
for i, spot1 in enumerate(landmarks) :
c[i] = spot1.attractiveness
for j in range(i+1, L) :
if i !=j :
t = get_time(spot1.location, landmarks[j].location)
A_ub[i*L + j] = t + spot1.duration
A_ub[j*L + i] = t + landmarks[j].duration
# Expand 'c' to L*L for every decision variable and ad
c = np.tile(c, L)
# Now sort and modify A_ub for each row
if L > 22 :
for i in range(L):
# Get indices of the 4 smallest values in row i
row_values = A_ub[i*L:i*L+L]
closest_indices = np.argpartition(row_values, 22)[:22]
# Create a mask for non-closest landmarks
mask = np.ones(L, dtype=bool)
mask[closest_indices] = False
# Set non-closest landmarks to 32765
row_values[mask] = 32765
A_ub[i*L:i*L+L] = row_values
# Add the objective and the 1 distance constraint
prob += pl.lpSum([c[j] * x[j] for j in range(L*L)])
prob += (pl.lpSum([A_ub[j] * x[j] for j in range(L*L)]) <= b_ub)
def respect_number(self, prob: pl.LpProblem, x: pl.LpVariable, L: int, max_landmarks: int):
"""
Generate constraints to ensure each landmark is visited at most once and cap the total number of visited landmarks.
This function adds the following constraints to the linear program:
1. Each landmark is visited at most once by creating L-2 constraints (one for each landmark).
2. The total number of visited landmarks is capped by the specified maximum number (`max_landmarks`) plus 2.
Args:
prob (pl.LpProblem): The linear programming problem where constraints will be added.
x (pl.LpVariable): Decision variable indicating whether a landmark is visited.
L (int): The total number of landmarks.
max_landmarks (int): The maximum number of landmarks that can be visited.
Returns:
None: This function directly modifies the `prob` object by adding constraints.
"""
# L-2 constraints: each landmark is visited exactly once
for i in range(1, L-1):
prob += (pl.lpSum([x[L*i + j] for j in range(L)]) <= 1)
# 1 constraint: cap the total number of visits
prob += (pl.lpSum([1 * x[j] for j in range(L*L)]) <= max_landmarks+2)
def break_sym(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):
"""
Generate constraints to prevent simultaneous travel between two landmarks
in both directions. This constraint ensures that, for any pair of landmarks,
travel from landmark i to landmark j (dij) and travel from landmark j to landmark i (dji)
cannot happen simultaneously.
This method adds constraints to break symmetry, specifically to prevent
cyclic paths with only two elements. It does not prevent cyclic paths involving more than two elements.
Args:
prob (pl.LpProblem): The linear programming problem where constraints will be added.
x (pl.LpVariable): Decision variable representing travel between landmarks.
L (int): The total number of landmarks.
Returns:
None: This function modifies the `prob` object by adding constraints in-place.
"""
upper_ind = np.triu_indices(L, 0, L) # Get the upper triangular indices
up_ind_x = upper_ind[0]
up_ind_y = upper_ind[1]
# Loop over the upper triangular indices, excluding diagonal elements
for i, up_ind in enumerate(up_ind_x):
if up_ind != up_ind_y[i]:
# Add (L*L-L)/2 constraints to break symmetry
prob += (x[up_ind*L + up_ind_y[i]] + x[up_ind_y[i]*L + up_ind] <= 1)
def init_eq_not_stay(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):
"""
Generate constraints to prevent staying at the same position during travel.
Specifically, it removes travel from a landmark to itself (e.g., d11, d22, d33, etc.).
This function adds one equality constraint to the optimization problem that ensures
no decision variable corresponding to staying at the same landmark is included
in the solution. This helps in ensuring that the path does not include self-loops.
Args:
prob (pl.LpProblem): The linear programming problem where constraints will be added.
x (pl.LpVariable): Decision variable representing travel between landmarks.
L (int): The total number of landmarks.
Returns:
None: This function modifies the `prob` object by adding an equality constraint in-place.
"""
A_eq = np.zeros((L, L), dtype=np.int8)
# Set diagonal elements to 1 (to prevent staying in the same position)
np.fill_diagonal(A_eq, 1)
A_eq = A_eq.flatten()
# First equality constraint
prob += (pl.lpSum([A_eq[j] * x[j] for j in range(L*L)]) == 0)
def respect_start_finish(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):
"""
Generate constraints to ensure that the optimization starts at the designated
start landmark and finishes at the goal landmark.
Specifically, this function adds three equality constraints:
1. Ensures that the path starts at the designated start landmark (row 0).
2. Ensures that the path finishes at the designated goal landmark (row 1).
3. Prevents any arrivals at the start landmark or departures from the goal landmark (row 2).
Args:
prob (pl.LpProblem): The linear programming problem where constraints will be added.
x (pl.LpVariable): Decision variable representing travel between landmarks.
L (int): The total number of landmarks.
Returns:
None: This function modifies the `prob` object by adding three equality constraints in-place.
"""
# Fill-in row 0.
A_eq = np.zeros((3,L*L), dtype=np.int8)
A_eq[0, :L] = np.ones(L, dtype=np.int8) # sets departures only for start (horizontal ones)
for k in range(L-1) :
if k != 0 :
# Fill-in row 1
A_eq[1, k*L+L-1] = 1 # sets arrivals only for finish (vertical ones)
# Fill-in row 1
A_eq[2, k*L] = 1
A_eq[2, L*(L-1):] = np.ones(L, dtype=np.int8) # prevents arrivals at start and departures from goal
b_eq= [1, 1, 0]
# Add the constraints to pulp
for i in range(3) :
prob += (pl.lpSum([A_eq[i][j] * x[j] for j in range(L*L)]) == b_eq[i])
def respect_order(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):
"""
Generate constraints to tie the optimization problem together and prevent
stacked ones, although this does not fully prevent circles.
This function adds constraints to the optimization problem that prevent
simultaneous travel between landmarks in a way that would result in stacked ones.
However, it does not fully prevent circular paths.
Args:
prob (pl.LpProblem): The linear programming problem where constraints will be added.
x (pl.LpVariable): Decision variable representing travel between landmarks.
L (int): The total number of landmarks.
Returns:
None: This function modifies the `prob` object by adding L-2 equality constraints in-place.
"""
# FIXME: weird 0 artifact in the coefficients popping up
# Loop through rows 1 to L-2 to prevent stacked ones
for i in range(1, L-1):
# Add the constraint that sums across each "row" or "block" in the decision variables
row_sum = -pl.lpSum(x[i + j*L] for j in range(L)) + pl.lpSum(x[i*L:(i+1)*L])
prob += (row_sum == 0)
def respect_user_must(self, prob: pl.LpProblem, x: pl.LpVariable, L: int, landmarks: list[Landmark]) :
"""
Generate constraints to ensure that landmarks marked as 'must_do' are included in the optimization.
This function adds constraints to the optimization problem to ensure that landmarks marked as
'must_do' are included in the solution. It precomputes the constraints and adds them to the
problem accordingly.
Args:
prob (pl.LpProblem): The linear programming problem where constraints will be added.
x (pl.LpVariable): Decision variable representing travel between landmarks.
L (int): The total number of landmarks.
landmarks (list[Landmark]): List of landmarks, where some are marked as 'must_do'.
Returns:
None: This function modifies the `prob` object by adding equality constraints in-place.
"""
ones = np.ones(L, dtype=np.int8)
A_eq = np.zeros(L*L, dtype=np.int8)
for i, elem in enumerate(landmarks) :
if elem.must_do is True and i not in [0, L-1]:
A_eq[i*L:i*L+L] = ones
prob += (pl.lpSum([A_eq[j] * x[j] for j in range(L*L)]) == 1)
if elem.must_avoid is True and i not in [0, L-1]:
A_eq[i*L:i*L+L] = ones
prob += (pl.lpSum([A_eq[j] * x[j] for j in range(L*L)]) == 2)
def prevent_circle(self, prob: pl.LpProblem, x: pl.LpVariable, circle_vertices: list, L: int) :
"""
Prevent circular paths by adding constraints to the optimization.
This function ensures that circular paths in both directions (i.e., forward and reverse)
between landmarks are avoided in the optimization problem by adding the corresponding constraints.
Args:
prob (pl.LpProblem): The linear programming problem instance to which the constraints will be added.
x (pl.LpVariable): Decision variable representing the travel between landmarks in the problem.
circle_vertices (list): List of indices representing the landmarks that form a circular path.
L (int): The total number of landmarks.
Returns:
None: This function modifies the `prob` object by adding two equality constraints that
prevent circular paths in both directions for the specified circle vertices.
"""
l = np.zeros((2, L*L), dtype=np.int8)
for i, node in enumerate(circle_vertices[:-1]) :
next = circle_vertices[i+1]
l[0, node*L + next] = 1
l[1, next*L + node] = 1
s = circle_vertices[0]
g = circle_vertices[-1]
l[0, g*L + s] = 1
l[1, s*L + g] = 1
# Add the constraints
prob += (pl.lpSum([l[0][j] * x[j] for j in range(L*L)]) == 0)
prob += (pl.lpSum([l[1][j] * x[j] for j in range(L*L)]) == 0)
def is_connected(self, resx) :
"""
Determine the order of visits and detect any circular paths in the given configuration.
Args:
resx (list): List of edge weights.
Returns:
tuple[list[int], Optional[list[list[int]]]]: A tuple containing the visit order and a list of any detected circles.
"""
resx = np.round(resx).astype(np.int8) # round all elements and cast them to int
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0]
ind_b = nonzero_tup[1]
# Extract all journeys
all_journeys_nodes = []
visited_nodes = set()
for node in ind_a:
if node not in visited_nodes:
journey_nodes = self.get_journey(node, ind_a, ind_b)
all_journeys_nodes.append(journey_nodes)
visited_nodes.update(journey_nodes)
for l in all_journeys_nodes :
if 0 in l :
all_journeys_nodes.remove(l)
break
if not all_journeys_nodes :
return None
return all_journeys_nodes
def get_journey(self, start, ind_a, ind_b):
"""
Trace the journey starting from a given node and follow the connections between landmarks.
This method constructs a graph from two lists of landmark connections, `ind_a` and `ind_b`,
where each element in `ind_a` is connected to the corresponding element in `ind_b`.
It then performs a depth-first search (DFS) starting from the `start` node to determine
the path (journey) by following the connections.
Args:
start (int): The starting node of the journey.
ind_a (list[int]): List of "from" nodes, representing the starting points of each connection.
ind_b (list[int]): List of "to" nodes, representing the endpoints of each connection.
Returns:
list[int]: A list of nodes representing the order of the journey, starting from the `start` node.
Example:
If `ind_a = [0, 1, 2]` and `ind_b = [1, 2, 3]`, starting from node 0, the journey would be `[0, 1, 2, 3]`.
"""
graph = defaultdict(list)
for a, b in zip(ind_a, ind_b):
graph[a].append(b)
journey_nodes = []
visited = set()
stack = deque([start])
while stack:
node = stack.pop()
if node not in visited:
visited.add(node)
journey_nodes.append(node)
for neighbor in graph[node]:
if neighbor not in visited:
stack.append(neighbor)
return journey_nodes
def get_order(self, resx):
"""
Determine the order of visits given the result of the optimization.
Args:
resx (list): List of edge weights.
Returns:
list[int]: A list containing the visit order.
"""
resx = np.round(resx).astype(np.uint8) # must contain only 0 and 1
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0].tolist()
ind_b = nonzero_tup[1].tolist()
order = [0]
current = 0
used_indices = set() # Track visited index pairs
while True:
# Find index of the current node in ind_a
try:
i = ind_a.index(current)
except ValueError:
break # No more links, stop the search
if i in used_indices:
break # Prevent infinite loops
used_indices.add(i) # Mark this index as visited
next_node = ind_b[i] # Get the corresponding node in ind_b
order.append(next_node) # Add it to the path
# Switch roles, now look for next_node in ind_a
try:
current = next_node
except ValueError:
break # No further connections, end the path
return order
def link_list(self, order: list[int], landmarks: list[Landmark])->list[Landmark] :
"""
Compute the time to reach from each landmark to the next and create a list of landmarks with updated travel times.
Args:
order (list[int]): List of indices representing the order of landmarks to visit.
landmarks (list[Landmark]): List of all landmarks.
Returns:
list[Landmark]]: The updated linked list of landmarks with travel times
"""
L = []
j = 0
while j < len(order)-1 :
# get landmarks involved
elem = landmarks[order[j]]
next = landmarks[order[j+1]]
# get attributes
elem.time_to_reach_next = get_time(elem.location, next.location)
elem.must_do = True
elem.location = (round(elem.location[0], 5), round(elem.location[1], 5))
elem.next_uuid = next.uuid
L.append(elem)
j += 1
next.location = (round(next.location[0], 5), round(next.location[1], 5))
next.must_do = True
L.append(next)
return L
def pre_processing(self, L: int, landmarks: list[Landmark], max_time: int, max_landmarks: int | None) :
"""
Preprocesses the optimization problem by setting up constraints and variables for the tour optimization.
This method initializes and prepares the linear programming problem to optimize a tour that includes landmarks,
while respecting various constraints such as time limits, the number of landmarks to visit, and user preferences.
The pre-processing step sets up the problem before solving it using a linear programming solver.
Responsibilities:
- Defines the optimization problem using linear programming (LP) with the objective to maximize the tour value.
- Creates binary decision variables for each potential transition between landmarks.
- Sets up inequality constraints to respect the maximum time available for the tour and the maximum number of landmarks.
- Implements equality constraints to ensure the tour respects the start and finish positions, avoids staying in the same place,
and adheres to a visit order.
- Forces inclusion or exclusion of specific landmarks based on user preferences.
Attributes:
prob (pl.LpProblem): The linear programming problem to be solved.
x (list): A list of binary variables representing transitions between landmarks.
L (int): The total number of landmarks considered in the optimization.
landmarks (list[Landmark]): The list of landmarks to be visited in the tour.
max_time (int): The maximum allowable time for the entire tour.
max_landmarks (int | None): The maximum number of landmarks to visit in the tour, or None if no limit is set.
Returns:
prob (pl.LpProblem): The linear programming problem setup for optimization.
x (list): The list of binary variables for transitions between landmarks in the tour.
"""
if max_landmarks is None :
max_landmarks = self.max_landmarks
# Initalize the optimization problem
prob = pl.LpProblem("OptimizationProblem", pl.LpMaximize)
# Define the problem
x_bounds = [(0, 1)]*L*L
x = [pl.LpVariable(f"x_{i}", lowBound=x_bounds[i][0], upBound=x_bounds[i][1], cat='Binary') for i in range(L*L)]
# Setup the inequality constraints
self.init_ub_time(prob, x, L, landmarks, max_time) # Adds the distances from each landmark to the other.
self.respect_number(prob, x, L, max_landmarks) # Respects max number of visits (no more possible stops than landmarks).
self.break_sym(prob, x, L) # Breaks the 'zig-zag' symmetry. Avoids d12 and d21 but not larger cirlces.
# Setup the equality constraints
self.init_eq_not_stay(prob, x, L) # Force solution not to stay in same place
self.respect_start_finish(prob, x, L) # Force start and finish positions
self.respect_order(prob, x, L) # Respect order of visit (only works when max_time is limiting factor)
self.respect_user_must(prob, x, L, landmarks) # Force to do/avoid landmarks set by user.
return prob, x
def solve_optimization(self, max_time: int, landmarks: list[Landmark], max_landmarks: int = None) -> list[Landmark]:
"""
Main optimization pipeline to solve the landmark visiting problem.
This method sets up and solves a linear programming problem with constraints to find an optimal tour of landmarks,
considering user-defined must-visit landmarks, start and finish points, and ensuring no cycles are present.
Args:
max_time (int): Maximum time allowed for the tour in minutes.
landmarks (list[Landmark]): List of landmarks to visit.
max_landmarks (int): Maximum number of landmarks visited
Returns:
list[Landmark]: The optimized tour of landmarks with updated travel times, or None if no valid solution is found.
"""
# Setup the optimization proplem.
L = len(landmarks)
prob, x = self.pre_processing(L, landmarks, max_time, max_landmarks)
# Solve the problem and extract results.
prob.solve(pl.PULP_CBC_CMD(msg=False, gapRel=0.1))
status = pl.LpStatus[prob.status]
solution = [pl.value(var) for var in x] # The values of the decision variables (will be 0 or 1)
self.logger.debug("First results are out. Looking out for circles and correcting.")
# Raise error if no solution is found. FIXME: for now this throws the internal server error
if status != 'Optimal' :
self.logger.error("The problem is overconstrained, no solution on first try.")
raise ArithmeticError("No solution could be found. Please try again with more time or different preferences.")
# If there is a solution, we're good to go, just check for connectiveness
circles = self.is_connected(solution)
i = 0
timeout = 40
while circles is not None :
i += 1
if i == timeout :
self.logger.error(f'Timeout: No solution found after {timeout} iterations.')
raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.")
for circle in circles :
self.prevent_circle(prob, x, circle, L)
# Solve the problem again
prob.solve(pl.PULP_CBC_CMD(msg=False))
solution = [pl.value(var) for var in x]
if pl.LpStatus[prob.status] != 'Optimal' :
self.logger.error("The problem is overconstrained, no solution after {i} cycles.")
raise ArithmeticError("No solution could be found. Please try again with more time or different preferences.")
circles = self.is_connected(solution)
if circles is None :
break
# Sort the landmarks in the order of the solution
order = self.get_order(solution)
tour = [landmarks[i] for i in order]
self.logger.debug(f"Re-optimized {i} times, objective value : {int(pl.value(prob.objective))}")
return tour

View File

@ -1,23 +1,32 @@
import yaml, logging
from shapely import buffer, LineString, Point, Polygon, MultiPoint, concave_hull
"""Allows to refine the tour by adding more landmarks and making the path easier to follow."""
import logging
from math import pi
import yaml
from shapely import buffer, LineString, Point, Polygon, MultiPoint, concave_hull
from ..structs.landmark import Landmark
from . import take_most_important, get_time_separation
from ..utils.get_time_distance import get_time
from ..utils.take_most_important import take_most_important
from .optimizer import Optimizer
from ..constants import OPTIMIZER_PARAMETERS_PATH
class Refiner :
"""
Refines a tour by incorporating smaller landmarks along the path to enhance the experience.
This class is designed to adjust an existing tour by considering additional,
smaller points of interest (landmarks) that may require minor detours but
improve the overall quality of the tour. It balances the efficiency of travel
with the added value of visiting these landmarks.
"""
logger = logging.getLogger(__name__)
detour_factor: float # detour factor of straight line vs real distance in cities
detour_corridor_width: float # width of the corridor around the path
average_walking_speed: float # average walking speed of adult
max_landmarks_refiner: int # max number of landmarks to visit
max_landmarks_refiner: int # max number of landmarks to visit
optimizer: Optimizer # optimizer object
def __init__(self, optimizer: Optimizer) :
@ -45,7 +54,7 @@ class Refiner :
"""
corrected_width = (180*width)/(6371000*pi)
path = self.create_linestring(landmarks)
obj = buffer(path, corrected_width, join_style="mitre", cap_style="square", mitre_limit=2)
@ -70,7 +79,7 @@ class Refiner :
return LineString(points)
# Check if some coordinates are in area. Used for the corridor
# Check if some coordinates are in area. Used for the corridor
def is_in_area(self, area: Polygon, coordinates) -> bool :
"""
Check if a given point is within a specified area.
@ -86,7 +95,7 @@ class Refiner :
return point.within(area)
# Function to determine if two landmarks are close to each other
# Function to determine if two landmarks are close to each other
def is_close_to(self, location1: tuple[float], location2: tuple[float]):
"""
Determine if two locations are close to each other by rounding their coordinates to 3 decimal places.
@ -119,7 +128,7 @@ class Refiner :
Returns:
list[Landmark]: The rearranged list of landmarks with grouped nearby visits.
"""
i = 1
while i < len(tour):
j = i+1
@ -131,9 +140,9 @@ class Refiner :
break # Move to the next i-th element after rearrangement
j += 1
i += 1
return tour
def integrate_landmarks(self, sub_list: list[Landmark], main_list: list[Landmark]) :
"""
Inserts 'sub_list' of Landmarks inside the 'main_list' by leaving the ends untouched.
@ -166,27 +175,27 @@ class Refiner :
should be visited, and the second element is a `Polygon` representing
the path connecting all landmarks.
"""
# Step 1: Find 'start' and 'finish' landmarks
start_idx = next(i for i, lm in enumerate(landmarks) if lm.type == 'start')
finish_idx = next(i for i, lm in enumerate(landmarks) if lm.type == 'finish')
start_landmark = landmarks[start_idx]
finish_landmark = landmarks[finish_idx]
# Step 2: Create a list of unvisited landmarks excluding 'start' and 'finish'
unvisited_landmarks = [lm for i, lm in enumerate(landmarks) if i not in [start_idx, finish_idx]]
# Step 3: Initialize the path with the 'start' landmark
path = [start_landmark]
coordinates = [landmarks[start_idx].location]
current_landmark = start_landmark
# Step 4: Use nearest neighbor heuristic to visit all landmarks
while unvisited_landmarks:
nearest_landmark = min(unvisited_landmarks, key=lambda lm: get_time_separation.get_time(current_landmark.location, lm.location))
nearest_landmark = min(unvisited_landmarks, key=lambda lm: get_time(current_landmark.location, lm.location))
path.append(nearest_landmark)
coordinates.append(nearest_landmark.location)
current_landmark = nearest_landmark
@ -224,12 +233,12 @@ class Refiner :
for visited in visited_landmarks :
visited_names.append(visited.name)
for landmark in all_landmarks :
if self.is_in_area(area, landmark.location) and landmark.name not in visited_names:
second_order_landmarks.append(landmark)
return take_most_important.take_most_important(second_order_landmarks, int(self.max_landmarks_refiner*0.75))
return take_most_important(second_order_landmarks, int(self.max_landmarks_refiner*0.75))
# Try fix the shortest path using shapely
@ -256,7 +265,7 @@ class Refiner :
coords_dict[landmark.location] = landmark
tour_poly = Polygon(coords)
better_tour_poly = tour_poly.buffer(0)
try :
xs, ys = better_tour_poly.exterior.xy
@ -265,7 +274,7 @@ class Refiner :
better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish
xs, ys = better_tour_poly.exterior.xy
except :
except Exception:
better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish
xs, ys = better_tour_poly.exterior.xy
"""
@ -299,7 +308,7 @@ class Refiner :
# Rearrange only if polygon still not simple
if not better_tour_poly.is_simple :
better_tour = self.rearrange(better_tour)
return better_tour
@ -330,10 +339,10 @@ class Refiner :
# No need to refine if no detour is taken
# if detour == 0:
# return base_tour
minor_landmarks = self.get_minor_landmarks(all_landmarks, base_tour, self.detour_corridor_width)
self.logger.info(f"Using {len(minor_landmarks)} minor landmarks around the predicted path")
self.logger.debug(f"Using {len(minor_landmarks)} minor landmarks around the predicted path")
# Full set of visitable landmarks.
full_set = self.integrate_landmarks(minor_landmarks, base_tour) # could probably be optimized with less overhead
@ -341,7 +350,7 @@ class Refiner :
# Generate a new tour with the optimizer.
new_tour = self.optimizer.solve_optimization(
max_time = max_time + detour,
landmarks = full_set,
landmarks = full_set,
max_landmarks = self.max_landmarks_refiner
)
@ -357,7 +366,7 @@ class Refiner :
# Find shortest path using the nearest neighbor heuristic.
better_tour, better_poly = self.find_shortest_path_through_all_landmarks(new_tour)
# Fix the tour using Polygons if the path looks weird.
# Fix the tour using Polygons if the path looks weird.
# Conditions : circular trip and invalid polygon.
if base_tour[0].location == base_tour[-1].location and not better_poly.is_valid :
better_tour = self.fix_using_polygon(better_tour)

View File

View File

@ -0,0 +1,140 @@
"""Module defining the caching strategy for overpass requests."""
import os
import xml.etree.ElementTree as ET
import hashlib
from ..constants import OSM_CACHE_DIR
def get_cache_key(query: str) -> str:
"""
Generate a unique cache key for the query using a hash function.
This ensures that queries with different parameters are cached separately.
"""
return hashlib.md5(query.encode('utf-8')).hexdigest()
class CachingStrategyBase:
"""
Base class for implementing caching strategies.
This class defines the structure for a caching strategy with basic methods
that must be implemented by subclasses. Subclasses should define how to
retrieve, store, and close the cache.
"""
def get(self, key):
"""Retrieve the cached data associated with the provided key."""
raise NotImplementedError('Subclass should implement get')
def set(self, key, value):
"""Store data in the cache with the specified key."""
raise NotImplementedError('Subclass should implement set')
def close(self):
"""Clean up or close any resources used by the caching strategy."""
class XMLCache(CachingStrategyBase):
"""
A caching strategy that stores and retrieves data in XML format.
This class provides methods to cache data as XML files in a specified directory.
The directory is automatically suffixed with '_XML' to distinguish it from other
caching strategies. The data is stored and retrieved using XML serialization.
Args:
cache_dir (str): The base directory where XML cache files will be stored.
Defaults to 'OSM_CACHE_DIR' with a '_XML' suffix.
Methods:
get(key): Retrieve cached data from a XML file associated with the given key.
set(key, value): Store data in a XML file with the specified key.
"""
def __init__(self, cache_dir=OSM_CACHE_DIR):
# Add the class name as a suffix to the directory
self._cache_dir = f'{cache_dir}_XML'
if not os.path.exists(self._cache_dir):
os.makedirs(self._cache_dir)
def _filename(self, key):
return os.path.join(self._cache_dir, f'{key}.xml')
def get(self, key):
"""Retrieve XML data from the cache and parse it as an ElementTree."""
filename = self._filename(key)
if os.path.exists(filename):
try:
# Parse and return the cached XML data
tree = ET.parse(filename)
return tree.getroot() # Return the root element of the parsed XML
except ET.ParseError:
# print(f"Error parsing cached XML file: {filename}")
return None
return None
def set(self, key, value):
"""Save the XML data as an ElementTree to the cache."""
filename = self._filename(key)
tree = ET.ElementTree(value) # value is expected to be an ElementTree root element
try:
# Write the XML data to a file
with open(filename, 'wb') as file:
tree.write(file, encoding='utf-8', xml_declaration=True)
except IOError as e:
raise IOError(f"Error writing to cache file: {filename} - {e}") from e
class CachingStrategy:
"""
A class to manage different caching strategies.
This class provides an interface to switch between different caching strategies
(e.g., XMLCache, JSONCache) dynamically. It allows caching data in different formats,
depending on the strategy being used. By default, it uses the XMLCache strategy.
Attributes:
__strategy (CachingStrategyBase): The currently active caching strategy.
__strategies (dict): A mapping between strategy names (as strings) and their corresponding
classes, allowing dynamic selection of caching strategies.
"""
__strategy = XMLCache() # Default caching strategy
__strategies = {
'XML': XMLCache,
}
@classmethod
def use(cls, strategy_name='XML', **kwargs):
"""
Set the caching strategy based on the strategy_name provided.
Args:
strategy_name (str): The name of the caching strategy (e.g., 'XML').
**kwargs: Additional keyword arguments to pass when initializing the strategy.
"""
# If a previous strategy exists, close it
if cls.__strategy:
cls.__strategy.close()
# Retrieve the strategy class based on the strategy name
strategy_class = cls.__strategies.get(strategy_name)
if not strategy_class:
raise ValueError(f"Unknown caching strategy: {strategy_name}")
# Instantiate the new strategy with the provided arguments
cls.__strategy = strategy_class(**kwargs)
return cls.__strategy
@classmethod
def get(cls, key):
"""Get data from the current strategy's cache."""
if not cls.__strategy:
raise RuntimeError("Caching strategy has not been set.")
return cls.__strategy.get(key)
@classmethod
def set(cls, key, value):
"""Set data in the current strategy's cache."""
if not cls.__strategy:
raise RuntimeError("Caching strategy has not been set.")
cls.__strategy.set(key, value)

View File

@ -0,0 +1,171 @@
"""Module allowing connexion to overpass api and fectch data from OSM."""
from typing import Literal, List
import urllib
import logging
import xml.etree.ElementTree as ET
from .caching_strategy import get_cache_key, CachingStrategy
from ..constants import OSM_CACHE_DIR
logger = logging.getLogger('Overpass')
osm_types = List[Literal['way', 'node', 'relation']]
class Overpass :
"""
Overpass class to manage the query building and sending to overpass api.
The caching strategy is a part of this class and initialized upon creation of the Overpass object.
"""
def __init__(self, caching_strategy: str = 'XML', cache_dir: str = OSM_CACHE_DIR) :
"""
Initialize the Overpass instance with the url, headers and caching strategy.
"""
self.overpass_url = "https://overpass-api.de/api/interpreter"
self.headers = {'User-Agent': 'Mozilla/5.0 (compatible; OverpassQuery/1.0; +http://example.com)',}
self.caching_strategy = CachingStrategy.use(caching_strategy, cache_dir=cache_dir)
@classmethod
def build_query(self, area: tuple, osm_types: osm_types,
selector: str, conditions=[], out='center') -> str:
"""
Constructs a query string for the Overpass API to retrieve OpenStreetMap (OSM) data.
Args:
area (tuple): A tuple representing the geographical search area, typically in the format
(radius, latitude, longitude). The first element is a string like "around:2000"
specifying the search radius, and the second and third elements represent
the latitude and longitude as floats or strings.
osm_types (list[str]): A list of OSM element types to search for. Must be one or more of
'Way', 'Node', or 'Relation'.
selector (str): The key or tag to filter the OSM elements (e.g., 'amenity', 'highway', etc.).
conditions (list, optional): A list of conditions to apply as additional filters for the
selected OSM elements. The conditions should be written in
the Overpass QL format, and they are combined with '&&' if
multiple are provided. Defaults to an empty list.
out (str, optional): Specifies the output type, such as 'center', 'body', or 'tags'.
Defaults to 'center'.
Returns:
str: The constructed Overpass QL query string.
Notes:
- If no conditions are provided, the query will just use the `selector` to filter the OSM
elements without additional constraints.
- The search area must always formatted as "(radius, lat, lon)".
"""
if not isinstance(conditions, list) :
conditions = [conditions]
if not isinstance(osm_types, list) :
osm_types = [osm_types]
query = '('
# Round the radius to nearest 50 and coordinates to generate less queries
if area[0] > 500 :
search_radius = round(area[0] / 50) * 50
loc = tuple((round(area[1], 2), round(area[2], 2)))
else :
search_radius = round(area[0] / 25) * 25
loc = tuple((round(area[1], 3), round(area[2], 3)))
search_area = f"(around:{search_radius}, {str(loc[0])}, {str(loc[1])})"
if conditions :
conditions = '(if: ' + ' && '.join(conditions) + ')'
else :
conditions = ''
for elem in osm_types :
query += elem + '[' + selector + ']' + conditions + search_area + ';'
query += ');' + f'out {out};'
return query
def send_query(self, query: str) -> ET:
"""
Sends the Overpass QL query to the Overpass API and returns the parsed JSON response.
Args:
query (str): The Overpass QL query to be sent to the Overpass API.
Returns:
dict: The parsed JSON response from the Overpass API, or None if the request fails.
"""
# Generate a cache key for the current query
cache_key = get_cache_key(query)
# Try to fetch the result from the cache
cached_response = self.caching_strategy.get(cache_key)
if cached_response is not None :
logger.debug("Cache hit.")
return cached_response
# Prepare the data to be sent as POST request, encoded as bytes
data = urllib.parse.urlencode({'data': query}).encode('utf-8')
try:
# Create a Request object with the specified URL, data, and headers
request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers)
# Send the request and read the response
with urllib.request.urlopen(request) as response:
# Read and decode the response
response_data = response.read().decode('utf-8')
root = ET.fromstring(response_data)
# Cache the response data as an ElementTree root
self.caching_strategy.set(cache_key, root)
logger.debug("Response data added to cache.")
return root
except urllib.error.URLError as e:
raise ConnectionError(f"Error connecting to Overpass API: {e}") from e
def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) :
"""
Extracts base information (coordinates, OSM ID, and optionally a name) from an OSM element.
This function retrieves the latitude and longitude coordinates, OSM ID, and optionally the name
of a given OpenStreetMap (OSM) element. It handles different OSM types (e.g., 'node', 'way') by
extracting coordinates either directly or from a center tag, depending on the element type.
Args:
elem (ET.Element): The XML element representing the OSM entity.
osm_type (str): The type of the OSM entity (e.g., 'node', 'way'). If 'node', the coordinates
are extracted directly from the element; otherwise, from the 'center' tag.
with_name (bool): Whether to extract and return the name of the element. If True, it attempts
to find the 'name' tag within the element and return its value. Defaults to False.
Returns:
tuple: A tuple containing:
- osm_id (str): The OSM ID of the element.
- coords (tuple): A tuple of (latitude, longitude) coordinates.
- name (str, optional): The name of the element if `with_name` is True; otherwise, not included.
"""
# 1. extract coordinates
if osm_type != 'node' :
center = elem.find('center')
lat = float(center.get('lat'))
lon = float(center.get('lon'))
else :
lat = float(elem.get('lat'))
lon = float(elem.get('lon'))
coords = tuple((lat, lon))
# 2. Extract OSM id
osm_id = elem.get('id')
# 3. Extract name if specified and return
if with_name :
name = elem.find("tag[@k='name']").get('v') if elem.find("tag[@k='name']") is not None else None
return osm_id, coords, name
else :
return osm_id, coords

View File

@ -51,25 +51,26 @@ sightseeing:
- place_of_worship
- fountain
- townhall
water:
- reflecting_pool
water: reflecting_pool
bridge:
- aqueduct
- viaduct
- boardwalk
- cantilever
- abandoned
building:
- church
- chapel
- mosque
- synagogue
- ruins
- temple
- government
- cathedral
- castle
- museum
building: cathedral
# unused sightseeing/buildings:
# - church
# - chapel
# - mosque
# - synagogue
# - ruins
# - temple
# - government
# - cathedral
# - castle
# - museum
museums:
tourism:

View File

@ -4,9 +4,9 @@ church_coeff: 0.65
nature_coeff: 1.35
overall_coeff: 10
tag_exponent: 1.15
image_bonus: 10
image_bonus: 1.1
viewpoint_bonus: 5
wikipedia_bonus: 4
wikipedia_bonus: 1.1
name_bonus: 3
N_important: 40
pay_bonus: -1

View File

@ -2,5 +2,5 @@ detour_factor: 1.4
detour_corridor_width: 300
average_walking_speed: 4.8
max_landmarks: 10
max_landmarks_refiner: 30
max_landmarks_refiner: 20
overshoot: 1.1

File diff suppressed because it is too large Load Diff

View File

@ -1,698 +0,0 @@
{
"type": "FeatureCollection",
"generator": "overpass-turbo",
"copyright": "The data included in this document is from www.openstreetmap.org. The data is made available under ODbL.",
"timestamp": "2024-12-02T21:14:59Z",
"features": [
{
"type": "Feature",
"properties": {
"@id": "node/1345741798",
"name": "Cordonnerie Saint-Joseph",
"shop": "shoes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3481705,
48.0816462
]
},
"id": "node/1345741798"
},
{
"type": "Feature",
"properties": {
"@id": "node/2659184738",
"brand": "Armand Thiery",
"brand:wikidata": "Q2861975",
"brand:wikipedia": "fr:Armand Thiery",
"name": "Armand Thiery",
"opening_hours": "Mo-Sa 09:30-19:00",
"shop": "clothes",
"wheelchair": "limited"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3594454,
48.0785574
]
},
"id": "node/2659184738"
},
{
"type": "Feature",
"properties": {
"@id": "node/3618136290",
"name": "Chez Dominique",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3362362,
48.0712174
]
},
"id": "node/3618136290"
},
{
"type": "Feature",
"properties": {
"@id": "node/3618136605",
"name": "Divamod",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3304253,
48.0782989
]
},
"id": "node/3618136605"
},
{
"type": "Feature",
"properties": {
"@id": "node/3618284507",
"name": "Star tendances et voyages",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3474029,
48.0830993
]
},
"id": "node/3618284507"
},
{
"type": "Feature",
"properties": {
"@id": "node/3619696125",
"brand": "Zeeman",
"brand:wikidata": "Q184399",
"name": "Zeeman",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3413834,
48.0638444
]
},
"id": "node/3619696125"
},
{
"type": "Feature",
"properties": {
"@id": "node/4594398129",
"name": "Miss et Mister",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3308309,
48.0779118
]
},
"id": "node/4594398129"
},
{
"type": "Feature",
"properties": {
"@id": "node/4907320441",
"brand": "Sergent Major",
"brand:wikidata": "Q62521738",
"clothes": "babies;children",
"name": "Sergent Major",
"opening_hours": "Mo-Sa 09:30-19:00",
"shop": "clothes",
"wheelchair": "no"
},
"geometry": {
"type": "Point",
"coordinates": [
7.359116,
48.0787229
]
},
"id": "node/4907320441"
},
{
"type": "Feature",
"properties": {
"@id": "node/4907364791",
"brand": "Armand Thiery",
"brand:wikidata": "Q2861975",
"brand:wikipedia": "fr:Armand Thiery",
"clothes": "women",
"name": "Armand Thiery",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3601857,
48.0783373
]
},
"id": "node/4907364791"
},
{
"type": "Feature",
"properties": {
"@id": "node/4907385675",
"check_date": "2024-05-19",
"clothes": "children",
"name": "Du Pareil...au même",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3604521,
48.0779726
]
},
"id": "node/4907385675"
},
{
"type": "Feature",
"properties": {
"@id": "node/4922191645",
"name": "Abilos",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3566167,
48.0794136
]
},
"id": "node/4922191645"
},
{
"type": "Feature",
"properties": {
"@id": "node/4922191648",
"brand": "Esprit",
"brand:wikidata": "Q532746",
"brand:wikipedia": "en:Esprit Holdings",
"name": "Esprit",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3554004,
48.0787549
]
},
"id": "node/4922191648"
},
{
"type": "Feature",
"properties": {
"@id": "node/4922191972",
"brand": "Guess",
"brand:wikidata": "Q2470307",
"brand:wikipedia": "en:Guess (clothing)",
"name": "Guess",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.355273,
48.0788003
]
},
"id": "node/4922191972"
},
{
"type": "Feature",
"properties": {
"@id": "node/4922192001",
"name": "Lingerie",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3575453,
48.0779317
]
},
"id": "node/4922192001"
},
{
"type": "Feature",
"properties": {
"@id": "node/5359915869",
"name": "Al Assil",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3305665,
48.0780902
]
},
"id": "node/5359915869"
},
{
"type": "Feature",
"properties": {
"@id": "node/9089360040",
"brand": "Grain de Malice",
"brand:wikidata": "Q66757157",
"clothes": "women",
"name": "Grain de Malice",
"shop": "clothes",
"short_name": "GDM"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3593125,
48.0786234
]
},
"id": "node/9089360040"
},
{
"type": "Feature",
"properties": {
"@id": "node/9095193153",
"brand": "Undiz",
"brand:wikidata": "Q105306275",
"clothes": "underwear",
"name": "Undiz",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3599579,
48.0782846
]
},
"id": "node/9095193153"
},
{
"type": "Feature",
"properties": {
"@id": "node/9095193154",
"branch": "Lingerie",
"brand": "RougeGorge",
"brand:wikidata": "Q104600739",
"clothes": "underwear",
"name": "RougeGorge",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3604883,
48.0781607
]
},
"id": "node/9095193154"
},
{
"type": "Feature",
"properties": {
"@id": "node/9095212690",
"alt_name": "North Face",
"brand": "The North Face",
"brand:wikidata": "Q152784",
"brand:wikipedia": "en:The North Face",
"check_date": "2024-05-19",
"name": "The North Face",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3603923,
48.0773727
]
},
"id": "node/9095212690"
},
{
"type": "Feature",
"properties": {
"@id": "node/9095270059",
"air_conditioning": "no",
"clothes": "men",
"level": "0",
"name": "Maison Aume",
"second_hand": "no",
"shop": "clothes",
"wheelchair": "no"
},
"geometry": {
"type": "Point",
"coordinates": [
7.361364,
48.0799999
]
},
"id": "node/9095270059"
},
{
"type": "Feature",
"properties": {
"@id": "node/9098624272",
"name": "Destock Place",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3575161,
48.0793009
]
},
"id": "node/9098624272"
},
{
"type": "Feature",
"properties": {
"@id": "node/9123861652",
"name": "Weackers",
"shop": "shoes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.361329,
48.0785972
]
},
"id": "node/9123861652"
},
{
"type": "Feature",
"properties": {
"@id": "node/9162179887",
"brand": "Calzedonia",
"brand:wikidata": "Q1027874",
"brand:wikipedia": "en:Calzedonia",
"name": "Calzedonia",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3606374,
48.0780809
]
},
"id": "node/9162179887"
},
{
"type": "Feature",
"properties": {
"@id": "node/9162206449",
"clothes": "women",
"name": "Cop. Copine",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3600947,
48.078399
]
},
"id": "node/9162206449"
},
{
"type": "Feature",
"properties": {
"@id": "node/9162226360",
"brand": "Okaïdi",
"brand:wikidata": "Q3350027",
"brand:wikipedia": "fr:Okaïdi",
"name": "Okaïdi",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3596986,
48.078428
]
},
"id": "node/9162226360"
},
{
"type": "Feature",
"properties": {
"@id": "node/9162227010",
"brand": "Jules",
"brand:wikidata": "Q3188386",
"brand:wikipedia": "fr:Jules (enseigne)",
"clothes": "men",
"name": "Jules",
"opening_hours": "Mo-Sa 09:30-19:00",
"phone": "+33 3 89 41 03 62",
"shop": "clothes",
"website": "https://www.jules.com/fr-fr/magasins/1600133/"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3600323,
48.0782229
]
},
"id": "node/9162227010"
},
{
"type": "Feature",
"properties": {
"@id": "node/10151865029",
"name": "Atelier Cinq",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3571756,
48.0772657
]
},
"id": "node/10151865029"
},
{
"type": "Feature",
"properties": {
"@id": "node/10862176110",
"name": "L'hexagone",
"shop": "bag"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3808571,
48.0814138
]
},
"id": "node/10862176110"
},
{
"type": "Feature",
"properties": {
"@id": "node/11150877331",
"brand": "Punt Roma",
"brand:wikidata": "Q101423290",
"clothes": "women",
"name": "Punt Roma",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3571859,
48.0779406
]
},
"id": "node/11150877331"
},
{
"type": "Feature",
"properties": {
"@id": "node/11150959880",
"name": "Caroll",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3579354,
48.0779291
]
},
"id": "node/11150959880"
},
{
"type": "Feature",
"properties": {
"@id": "node/11302242094",
"branch": "Wintzenheim",
"name": "Label Fripe",
"opening_hours": "Mo-Sa 09:00-18:45",
"phone": "+33 3 89 27 39 25",
"second_hand": "only",
"shop": "clothes",
"website": "https://labelfripe.fr/label-fripe-wintzenheim/"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3109899,
48.0850362
]
},
"id": "node/11302242094"
},
{
"type": "Feature",
"properties": {
"@id": "node/11392247003",
"name": "Lingerie Sipp",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3111507,
48.0841835
]
},
"id": "node/11392247003"
},
{
"type": "Feature",
"properties": {
"@id": "node/11778819781",
"addr:city": "Colmar",
"addr:housenumber": "10",
"addr:postcode": "68000",
"addr:street": "Rue des Têtes",
"clothes": "suits;hats;men",
"name": "Phillipe",
"phone": "0389411983",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3559389,
48.0789064
]
},
"id": "node/11778819781"
},
{
"type": "Feature",
"properties": {
"@id": "node/11799215969",
"brand": "Petit Bateau",
"brand:wikidata": "Q3377090",
"name": "Petit Bateau",
"opening_hours": "Mo-Sa 10:00-19:00; Su 10:00-18:00",
"phone": "+33 3 89 24 97 85",
"shop": "clothes",
"website": "https://stores.petit-bateau.com/france/colmar/9-rue-des-boulangers"
},
"geometry": {
"type": "Point",
"coordinates": [
7.355149,
48.0780213
]
},
"id": "node/11799215969"
},
{
"type": "Feature",
"properties": {
"@id": "node/11816704669",
"addr:housenumber": "10",
"addr:street": "Rue des Boulangers",
"name": "des petits hauts",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3555001,
48.0780768
]
},
"id": "node/11816704669"
},
{
"type": "Feature",
"properties": {
"@id": "node/12320343534",
"addr:city": "Colmar",
"addr:housenumber": "44",
"addr:postcode": "68000",
"addr:street": "Rue des Clefs",
"brand": "Un Jour Ailleurs",
"brand:wikidata": "Q105106211",
"clothes": "women",
"name": "Un Jour Ailleurs",
"opening_hours": "Mo-Fr 10:00-19:00; Sa 10:00-18:30",
"phone": "+33368318572",
"shop": "clothes",
"website": "https://boutique.unjourailleurs.com/fr/mode-femme/boutique-colmar-76"
},
"geometry": {
"type": "Point",
"coordinates": [
7.35897,
48.0789807
]
},
"id": "node/12320343534"
},
{
"type": "Feature",
"properties": {
"@id": "node/12320343536",
"addr:city": "Colmar",
"addr:housenumber": "38",
"addr:postcode": "68000",
"addr:street": "Rue des Clefs",
"brand": "Timberland",
"brand:wikidata": "Q1539185",
"name": "Timberland",
"opening_hours": "Mo-Sa 10:00-19:00",
"phone": "+33389298650",
"shop": "clothes"
},
"geometry": {
"type": "Point",
"coordinates": [
7.3592409,
48.0788785
]
},
"id": "node/12320343536"
}
]
}

View File

@ -1,350 +0,0 @@
# pylint: skip-file
import numpy as np
import json
import os
from typing import Optional, Literal
from sklearn.cluster import DBSCAN
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
from pydantic import BaseModel
from OSMPythonTools.overpass import Overpass, overpassQueryBuilder
from OSMPythonTools.cachingStrategy import CachingStrategy, JSON
from math import sin, cos, sqrt, atan2, radians
EARTH_RADIUS_KM = 6373
class ShoppingLocation(BaseModel):
type: Literal['street', 'area']
importance: int
centroid: tuple
start: Optional[list] = None
end: Optional[list] = None
# Output to frontend
class Landmark(BaseModel) :
# Properties of the landmark
name : str
type: Literal['sightseeing', 'nature', 'shopping', 'start', 'finish']
location : tuple
osm_type : str
osm_id : int
attractiveness : int
n_tags : int
image_url : Optional[str] = None
website_url : Optional[str] = None
description : Optional[str] = None # TODO future
duration : Optional[int] = 0
name_en : Optional[str] = None
# Additional properties depending on specific tour
must_do : Optional[bool] = False
must_avoid : Optional[bool] = False
is_secondary : Optional[bool] = False
time_to_reach_next : Optional[int] = 0
next_uuid : Optional[str] = None
def extract_points(filestr: str) :
"""
Extract points from geojson file.
Returns :
np.array containing the points
"""
points = []
with open(os.path.dirname(__file__) + '/' + filestr, 'r') as f:
geojson = json.load(f)
for feature in geojson['features']:
if feature['geometry']['type'] == 'Point':
centroid = feature['geometry']['coordinates']
points.append(centroid)
elif feature['geometry']['type'] == 'Polygon':
centroid = np.array(feature['geometry']['coordinates'][0][0])
points.append(centroid)
# Convert the list of points to a NumPy array
return np.array(points)
def get_distance(p1: tuple[float, float], p2: tuple[float, float]) -> int:
"""
Calculate the time in minutes to travel from one location to another.
Args:
p1 (tuple[float, float]): Coordinates of the starting location.
p2 (tuple[float, float]): Coordinates of the destination.
Returns:
int: Time to travel from p1 to p2 in minutes.
"""
if p1 == p2:
return 0
else:
# Compute the distance in km along the surface of the Earth
# (assume spherical Earth)
# this is the haversine formula, stolen from stackoverflow
# in order to not use any external libraries
lat1, lon1 = radians(p1[0]), radians(p1[1])
lat2, lon2 = radians(p2[0]), radians(p2[1])
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
return EARTH_RADIUS_KM * c
def filter_clusters(cluster_points, cluster_labels):
"""
Remove clusters of less importance.
"""
label_counts = np.bincount(cluster_labels)
# Step 3: Get the indices (labels) of the 5 largest clusters
top_5_labels = np.argsort(label_counts)[-5:] # Get the largest 5 clusters
# Step 4: Filter points to keep only the points in the top 5 clusters
filtered_cluster_points = []
filtered_cluster_labels = []
for label in top_5_labels:
filtered_cluster_points.append(cluster_points[cluster_labels == label])
filtered_cluster_labels.append(np.full((label_counts[label],), label)) # Replicate the label
# Concatenate filtered clusters into a single array
return np.vstack(filtered_cluster_points), np.concatenate(filtered_cluster_labels)
def fit_lines(points, labels):
"""
Fit lines to identified clusters.
"""
all_x = []
all_y = []
lines = []
locations = []
for label in set(labels):
cluster_points = points[labels == label]
# If there's not enough points, skip
if len(cluster_points) < 2:
continue
# Apply PCA to find the principal component (i.e., the line of best fit)
pca = PCA(n_components=1)
pca.fit(cluster_points)
direction = pca.components_[0]
centroid = pca.mean_
# Project the cluster points onto the principal direction (line direction)
projections = np.dot(cluster_points - centroid, direction)
# Get the range of the projections to find the approximate length of the cluster
cluster_length = projections.max() - projections.min()
# Now adjust `t` so that it scales with the cluster length
t = np.linspace(-cluster_length / 2.75, cluster_length / 2.75, 10)
# Calculate the start and end of the line based on min/max projections
start_point = centroid[0] + t*direction[0]
end_point = centroid[1] + t*direction[1]
# Store the line
lines.append((start_point, end_point))
# For visualization, store the points
all_x.append(min(start_point))
all_x.append(max(start_point))
all_y.append(min(end_point))
all_y.append(max(end_point))
if np.linalg.norm(t) <= 0.0045 :
loc = ShoppingLocation(
type='area',
centroid=tuple((centroid[1], centroid[0])),
importance = len(cluster_points),
)
else :
loc = ShoppingLocation(
type='street',
centroid=tuple((centroid[1], centroid[0])),
importance = len(cluster_points),
start=start_point,
end=end_point
)
locations.append(loc)
xmin = min(all_x)
xmax = max(all_x)
ymin = min(all_y)
ymax = max(all_y)
corners = (xmin, xmax, ymin, ymax)
return corners, locations
def create_landmark(shopping_location: ShoppingLocation):
# Define the bounding box for a given radius around the coordinates
lat, lon = shopping_location.centroid
bbox = ("around:1000", str(lat), str(lon))
overpass = Overpass()
# CachingStrategy.use(JSON, cacheDir=OSM_CACHE_DIR)
# Query neighborhoods and shopping malls
selectors = ['"place"~"^(suburb|neighborhood|neighbourhood|quarter|city_block)$"', '"shop"="mall"']
min_dist = float('inf')
new_name = 'Shopping Area'
new_name_en = None
osm_id = 0
osm_type = 'node'
for sel in selectors :
query = overpassQueryBuilder(
bbox = bbox,
elementType = ['node', 'way', 'relation'],
selector = sel,
includeCenter = True,
out = 'center'
)
try:
result = overpass.query(query)
except Exception as e:
raise Exception("query unsuccessful")
for elem in result.elements():
location = (elem.centerLat(), elem.centerLon())
if location[0] is None :
location = (elem.lat(), elem.lon())
if location[0] is None :
# print(f"Fetching coordinates failed with {elem.type()}/{elem.id()}")
continue
# print(f"Distance : {get_distance(shopping_location.centroid, location)}")
d = get_distance(shopping_location.centroid, location)
if d < min_dist :
min_dist = d
new_name = elem.tag('name')
osm_type = elem.type() # Add type: 'way' or 'relation'
osm_id = elem.id() # Add OSM id
# add english name if it exists
try :
new_name_en = elem.tag('name:en')
except:
pass
return Landmark(
name=new_name,
type='shopping',
location=shopping_location.centroid, # TODO: use the fact the we can also recognize streets.
attractiveness=shopping_location.importance,
n_tags=0,
osm_id=osm_id,
osm_type=osm_type,
name_en=new_name_en
)
# Extract points
points = extract_points('vienna_data.json')
# print(len(points))
######## Create a figure with 1 row and 3 columns for side-by-side plots
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# Plot Raw data points
axes[0].set_title('Raw Data')
axes[0].scatter(points[:, 0], points[:, 1], color='blue', s=20)
# Apply DBSCAN to find clusters. Choose different settings for different cities.
if len(points) > 400 :
dbscan = DBSCAN(eps=0.00118, min_samples=15, algorithm='kd_tree') # for large cities
else :
dbscan = DBSCAN(eps=0.00075, min_samples=10, algorithm='kd_tree') # for small cities
labels = dbscan.fit_predict(points)
# Separate clustered points and noise points
clustered_points = points[labels != -1]
clustered_labels = labels[labels != -1]
noise_points = points[labels == -1]
######## Plot n°1: DBSCAN Clustering Results
axes[1].set_title('DBSCAN Clusters')
axes[1].scatter(clustered_points[:, 0], clustered_points[:, 1], c=clustered_labels, cmap='rainbow', s=20)
axes[1].scatter(noise_points[:, 0], noise_points[:, 1], c='blue', s=7, label='Noise')
# Keep the 5 biggest clusters
clustered_points, clustered_labels = filter_clusters(clustered_points, clustered_labels)
# Fit lines
corners, locations = fit_lines(clustered_points, clustered_labels)
(xmin, xmax, ymin, ymax) = corners
######## Plot clustered points in normal size and noise points separately
axes[2].scatter(clustered_points[:, 0], clustered_points[:, 1], c=clustered_labels, cmap='rainbow', s=30)
axes[2].set_title('PCA Fitted Lines on Clusters')
# Create a list of Landmarks for the shopping things
shopping_landmarks = []
for loc in locations :
axes[2].scatter(loc.centroid[1], loc.centroid[0], color='red', marker='x', s=200, linewidth=3)
landmark = create_landmark(loc)
shopping_landmarks.append(landmark)
axes[2].text(loc.centroid[1], loc.centroid[0], landmark.name,
ha='center', va='top', fontsize=6,
bbox=dict(facecolor='white', edgecolor='black', boxstyle='round,pad=0.2'),
zorder=3)
####### Plot the detected lines in the final plot #######
# for loc in locations:
# if loc.type == 'street' :
# line_x = loc.start
# line_y = loc.end
# axes[2].plot(line_x, line_y, color='lime', linewidth=3)
# else :
axes[0].set_xlim(xmin-0.01, xmax+0.01)
axes[0].set_ylim(ymin-0.01, ymax+0.01)
axes[1].set_xlim(xmin-0.01, xmax+0.01)
axes[1].set_ylim(ymin-0.01, ymax+0.01)
axes[2].set_xlim(xmin-0.01, xmax+0.01)
axes[2].set_ylim(ymin-0.01, ymax+0.01)
print("\n\n\n")
for landmark in shopping_landmarks :
print(f"{landmark.name} is a shopping area with a score of {landmark.attractiveness}")
plt.tight_layout()
plt.show()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -45,8 +45,11 @@ class Landmark(BaseModel) :
osm_id : int
attractiveness : int
n_tags : int
# Optional properties to gather more information.
image_url : Optional[str] = None
website_url : Optional[str] = None
wiki_url : Optional[str] = None
description : Optional[str] = None # TODO future
duration : Optional[int] = 0
name_en : Optional[str] = None
@ -62,6 +65,10 @@ class Landmark(BaseModel) :
time_to_reach_next : Optional[int] = 0
next_uuid : Optional[UUID] = None
# More properties to define the score
is_viewpoint : Optional[bool] = False
is_place_of_worship : Optional[bool] = False
def __str__(self) -> str:
"""
String representation of the Landmark object.
@ -136,7 +143,9 @@ class Toilets(BaseModel) :
str: A formatted string with the toilets location.
"""
return f'Toilets @{self.location}'
class Config:
# This allows us to easily convert the model to and from dictionaries
from_attributes = True
"""
This allows us to easily convert the model to and from dictionaries
"""
from_attributes = True

View File

@ -1,7 +1,7 @@
"""Linked and ordered list of Landmarks that represents the visiting order."""
from .landmark import Landmark
from ..utils.get_time_separation import get_time
from ..utils.get_time_distance import get_time
class LinkedLandmarks:
"""

View File

@ -1,42 +0,0 @@
"""Collection of tests to ensure correct handling of invalid input."""
from fastapi.testclient import TestClient
import pytest
from .test_utils import load_trip_landmarks
from ..main import app
@pytest.fixture(scope="module")
def client():
"""Client used to call the app."""
return TestClient(app)
def test_cache(client, request): # pylint: disable=redefined-outer-name
"""
Test n°1 : Custom test in Turckheim to ensure small villages are also supported.
Args:
client:
request:
"""
duration_minutes = 15
response = client.post(
"/trip/new",
json={
"preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
"nature": {"type": "nature", "score": 5},
"shopping": {"type": "shopping", "score": 5},
"max_time_minute": duration_minutes,
"detour_tolerance_minute": 0},
"start": [48.084588, 7.280405]
}
)
result = response.json()
landmarks = load_trip_landmarks(client, result['first_landmark_uuid'])
landmarks_cached = load_trip_landmarks(client, result['first_landmark_uuid'], True)
# checks :
assert response.status_code == 200 # check for successful planning
assert landmarks_cached == landmarks

View File

@ -1,9 +1,9 @@
"""Collection of tests to ensure correct implementation and track progress. """
import time
from fastapi.testclient import TestClient
import pytest
from .test_utils import landmarks_to_osmid, load_trip_landmarks, log_trip_details
from .test_utils import load_trip_landmarks, log_trip_details
from ..main import app
@pytest.fixture(scope="module")
@ -20,7 +20,9 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
client:
request:
"""
duration_minutes = 15
start_time = time.time() # Start timer
duration_minutes = 20
response = client.post(
"/trip/new",
json={
@ -35,14 +37,24 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
result = response.json()
landmarks = load_trip_landmarks(client, result['first_landmark_uuid'])
# Get computation time
comp_time = time.time() - start_time
# Add details to report
log_trip_details(request, landmarks, result['total_time'], duration_minutes)
# for elem in landmarks :
# print(elem)
# checks :
assert response.status_code == 200 # check for successful planning
assert isinstance(landmarks, list) # check that the return type is a list
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2, f"Trip duration not within 20\% of desired length"
assert len(landmarks) > 2 # check that there is something to visit
assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}"
assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}"
# assert 2!= 3
def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
@ -53,7 +65,10 @@ def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
client:
request:
"""
start_time = time.time() # Start timer
duration_minutes = 120
response = client.post(
"/trip/new",
json={
@ -67,22 +82,227 @@ def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
)
result = response.json()
landmarks = load_trip_landmarks(client, result['first_landmark_uuid'])
osm_ids = landmarks_to_osmid(landmarks)
# Get computation time
comp_time = time.time() - start_time
# Add details to report
log_trip_details(request, landmarks, result['total_time'], duration_minutes)
for elem in landmarks :
print(elem)
print(elem.osm_id)
# for elem in landmarks :
# print(elem)
# checks :
assert response.status_code == 200 # check for successful planning
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
assert 136200148 in osm_ids # check for Cathédrale St. Jean in trip
# assert response.status_code == 2000 # check for successful planning
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}"
assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}"
def test_cologne(client, request) : # pylint: disable=redefined-outer-name
"""
Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
Args:
client:
request:
"""
start_time = time.time() # Start timer
duration_minutes = 240
response = client.post(
"/trip/new",
json={
"preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
"nature": {"type": "nature", "score": 5},
"shopping": {"type": "shopping", "score": 5},
"max_time_minute": duration_minutes,
"detour_tolerance_minute": 0},
"start": [50.942352665, 6.957777972392]
}
)
result = response.json()
landmarks = load_trip_landmarks(client, result['first_landmark_uuid'])
# Get computation time
comp_time = time.time() - start_time
# Add details to report
log_trip_details(request, landmarks, result['total_time'], duration_minutes)
# for elem in landmarks :
# print(elem)
# checks :
assert response.status_code == 200 # check for successful planning
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}"
assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}"
def test_strasbourg(client, request) : # pylint: disable=redefined-outer-name
"""
Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
Args:
client:
request:
"""
start_time = time.time() # Start timer
duration_minutes = 180
response = client.post(
"/trip/new",
json={
"preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
"nature": {"type": "nature", "score": 5},
"shopping": {"type": "shopping", "score": 5},
"max_time_minute": duration_minutes,
"detour_tolerance_minute": 0},
"start": [48.5846589226, 7.74078715721]
}
)
result = response.json()
landmarks = load_trip_landmarks(client, result['first_landmark_uuid'])
# Get computation time
comp_time = time.time() - start_time
# Add details to report
log_trip_details(request, landmarks, result['total_time'], duration_minutes)
# for elem in landmarks :
# print(elem)
# checks :
assert response.status_code == 200 # check for successful planning
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}"
assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}"
def test_zurich(client, request) : # pylint: disable=redefined-outer-name
"""
Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
Args:
client:
request:
"""
start_time = time.time() # Start timer
duration_minutes = 180
response = client.post(
"/trip/new",
json={
"preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
"nature": {"type": "nature", "score": 5},
"shopping": {"type": "shopping", "score": 5},
"max_time_minute": duration_minutes,
"detour_tolerance_minute": 0},
"start": [47.377884227, 8.5395114066]
}
)
result = response.json()
landmarks = load_trip_landmarks(client, result['first_landmark_uuid'])
# Get computation time
comp_time = time.time() - start_time
# Add details to report
log_trip_details(request, landmarks, result['total_time'], duration_minutes)
# for elem in landmarks :
# print(elem)
# checks :
assert response.status_code == 200 # check for successful planning
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}"
assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}"
def test_paris(client, request) : # pylint: disable=redefined-outer-name
"""
Test n°2 : Custom test in Paris (les Halles) centre to ensure proper decision making in crowded area.
Args:
client:
request:
"""
start_time = time.time() # Start timer
duration_minutes = 300
response = client.post(
"/trip/new",
json={
"preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
"nature": {"type": "nature", "score": 5},
"shopping": {"type": "shopping", "score": 5},
"max_time_minute": duration_minutes,
"detour_tolerance_minute": 0},
"start": [48.86248803298562, 2.346451131285925]
}
)
result = response.json()
landmarks = load_trip_landmarks(client, result['first_landmark_uuid'])
# Get computation time
comp_time = time.time() - start_time
# Add details to report
log_trip_details(request, landmarks, result['total_time'], duration_minutes)
# for elem in landmarks :
# print(elem)
# checks :
assert response.status_code == 200 # check for successful planning
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}"
assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}"
def test_new_york(client, request) : # pylint: disable=redefined-outer-name
"""
Test n°2 : Custom test in New York (les Halles) centre to ensure proper decision making in crowded area.
Args:
client:
request:
"""
start_time = time.time() # Start timer
duration_minutes = 600
response = client.post(
"/trip/new",
json={
"preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
"nature": {"type": "nature", "score": 5},
"shopping": {"type": "shopping", "score": 5},
"max_time_minute": duration_minutes,
"detour_tolerance_minute": 0},
"start": [40.72592726802, -73.9920434795]
}
)
result = response.json()
landmarks = load_trip_landmarks(client, result['first_landmark_uuid'])
# Get computation time
comp_time = time.time() - start_time
# Add details to report
log_trip_details(request, landmarks, result['total_time'], duration_minutes)
# for elem in landmarks :
# print(elem)
# checks :
assert response.status_code == 200 # check for successful planning
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}"
assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}"
def test_shopping(client, request) : # pylint: disable=redefined-outer-name
"""
@ -92,7 +312,9 @@ def test_shopping(client, request) : # pylint: disable=redefined-outer-name
client:
request:
"""
start_time = time.time() # Start timer
duration_minutes = 240
response = client.post(
"/trip/new",
json={
@ -107,27 +329,17 @@ def test_shopping(client, request) : # pylint: disable=redefined-outer-name
result = response.json()
landmarks = load_trip_landmarks(client, result['first_landmark_uuid'])
# Get computation time
comp_time = time.time() - start_time
# Add details to report
log_trip_details(request, landmarks, result['total_time'], duration_minutes)
for elem in landmarks :
print(elem)
# checks :
assert response.status_code == 200 # check for successful planning
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
# def test_new_trip_single_prefs(client):
# response = client.post(
# "/trip/new",
# json={
# "preferences": {"sightseeing": {"type": "sightseeing", "score": 1},
# "nature": {"type": "nature", "score": 1},
# "shopping": {"type": "shopping", "score": 1},
# "max_time_minute": 360,
# "detour_tolerance_minute": 0},
# "start": [48.8566, 2.3522]
# }
# )
# assert response.status_code == 200
# def test_new_trip_matches_prefs(client):
# pass
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}"
assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}"

View File

@ -6,11 +6,13 @@ import pytest
from ..structs.landmark import Toilets
from ..main import app
@pytest.fixture(scope="module")
def client():
"""Client used to call the app."""
return TestClient(app)
@pytest.mark.parametrize(
"location,radius,status_code",
[
@ -39,8 +41,6 @@ def test_invalid_input(client, location, radius, status_code): # pylint: disa
assert response.status_code == status_code
@pytest.mark.parametrize(
"location,status_code",
[
@ -66,11 +66,10 @@ def test_no_toilets(client, location, status_code): # pylint: disable=redefin
toilets_list = [Toilets.model_validate(toilet) for toilet in response.json()]
# checks :
assert response.status_code == 200 # check for successful planning
assert response.status_code == status_code # check for successful planning
assert isinstance(toilets_list, list) # check that the return type is a list
@pytest.mark.parametrize(
"location,status_code",
[
@ -97,6 +96,6 @@ def test_toilets(client, location, status_code): # pylint: disable=redefined-
toilets_list = [Toilets.model_validate(toilet) for toilet in response.json()]
# checks :
assert response.status_code == 200 # check for successful planning
assert response.status_code == status_code # check for successful planning
assert isinstance(toilets_list, list) # check that the return type is a list
assert len(toilets_list) > 0
assert len(toilets_list) > 0

View File

@ -23,45 +23,7 @@ def landmarks_to_osmid(landmarks: list[Landmark]) -> list[int] :
return ids
def fetch_landmark(client, landmark_uuid: str):
"""
Fetch landmark data from the API based on the landmark UUID.
Args:
landmark_uuid (str): The UUID of the landmark.
Returns:
dict: Landmark data fetched from the API.
"""
logger = logging.getLogger(__name__)
response = client.get(f"/landmark/{landmark_uuid}")
if response.status_code != 200:
raise HTTPException(status_code=500,
detail=f"Failed to fetch landmark with UUID {landmark_uuid}: {response.status_code}")
try:
json_data = response.json()
logger.info(f"API Response: {json_data}")
except ValueError as e:
logger.error(f"Failed to parse response as JSON: {response.text}")
raise HTTPException(status_code=500, detail="Invalid response format from API")
# Try validating against the Landmark model here to ensure consistency
try:
landmark = Landmark(**json_data)
except ValidationError as ve:
logging.error(f"Validation error: {ve}")
raise HTTPException(status_code=500, detail="Invalid data format received from API")
if "detail" in json_data:
raise HTTPException(status_code=500, detail=json_data["detail"])
return Landmark(**json_data)
def fetch_landmark_cache(landmark_uuid: str):
def fetch_landmark(landmark_uuid: str):
"""
Fetch landmark data from the cache based on the landmark UUID.
@ -75,26 +37,24 @@ def fetch_landmark_cache(landmark_uuid: str):
# Try to fetch the landmark data from the cache
try:
landmark = cache_client.get(f"landmark_{landmark_uuid}")
landmark = cache_client.get(f'landmark_{landmark_uuid}')
if not landmark :
logger.warning(f"Cache miss for landmark UUID: {landmark_uuid}")
raise HTTPException(status_code=404, detail=f"Landmark with UUID {landmark_uuid} not found in cache.")
logger.warning(f'Cache miss for landmark UUID: {landmark_uuid}')
raise HTTPException(status_code=404, detail=f'Landmark with UUID {landmark_uuid} not found in cache.')
# Validate that the fetched data is a dictionary
if not isinstance(landmark, Landmark):
logger.error(f"Invalid cache data format for landmark UUID: {landmark_uuid}. Expected dict, got {type(landmark).__name__}.")
logger.error(f'Invalid cache data format for landmark UUID: {landmark_uuid}. Expected dict, got {type(landmark).__name__}.')
raise HTTPException(status_code=500, detail="Invalid cache data format.")
return landmark
except Exception as exc:
logger.error(f"Unexpected error occurred while fetching landmark UUID {landmark_uuid}: {exc}")
logger.error(f'Unexpected error occurred while fetching landmark UUID {landmark_uuid}: {exc}')
raise HTTPException(status_code=500, detail="An unexpected error occurred while fetching the landmark from the cache") from exc
def load_trip_landmarks(client, first_uuid: str, from_cache=None) -> list[Landmark]:
def load_trip_landmarks(client, first_uuid: str) -> list[Landmark]:
"""
Load all landmarks for a trip using the response from the API.
@ -108,10 +68,7 @@ def load_trip_landmarks(client, first_uuid: str, from_cache=None) -> list[Landma
next_uuid = first_uuid
while next_uuid is not None:
if from_cache :
landmark = fetch_landmark_cache(next_uuid)
else :
landmark = fetch_landmark(client, next_uuid)
landmark = fetch_landmark(next_uuid)
landmarks.append(landmark)
next_uuid = landmark.next_uuid # Prepare for the next iteration
@ -122,14 +79,14 @@ def load_trip_landmarks(client, first_uuid: str, from_cache=None) -> list[Landma
def log_trip_details(request, landmarks: list[Landmark], duration: int, target_duration: int) :
"""
Allows to show the detailed trip in the html test report.
Args:
request:
landmarks (list): the ordered list of visited landmarks
duration (int): the total duration of this trip
target_duration(int): the target duration of this trip
"""
trip_string = [f"{landmark.name} ({landmark.attractiveness} | {landmark.duration}) - {landmark.time_to_reach_next}" for landmark in landmarks]
trip_string = [f'{landmark.name} ({landmark.attractiveness} | {landmark.duration}) - {landmark.time_to_reach_next}' for landmark in landmarks]
# Pass additional info to pytest for reporting
request.node.trip_details = trip_string

View File

View File

@ -1,17 +1,21 @@
"""Find clusters of interest to add more general areas of visit to the tour."""
import logging
from typing import Literal
import numpy as np
from sklearn.cluster import DBSCAN
from pydantic import BaseModel
from OSMPythonTools.overpass import Overpass, overpassQueryBuilder
from OSMPythonTools.cachingStrategy import CachingStrategy, JSON
from ..overpass.overpass import Overpass, get_base_info
from ..structs.landmark import Landmark
from ..utils.get_time_separation import get_distance
from .get_time_distance import get_distance
from ..constants import OSM_CACHE_DIR
# silence the overpass logger
logging.getLogger('Overpass').setLevel(level=logging.CRITICAL)
class Cluster(BaseModel):
""""
A class representing an interesting area for shopping or sightseeing.
@ -34,11 +38,24 @@ class Cluster(BaseModel):
class ClusterManager:
"""
A manager responsible for clustering points of interest, such as shops or historic sites,
to identify areas worth visiting. It uses the DBSCAN algorithm to detect clusters
based on a set of points retrieved from OpenStreetMap (OSM).
Attributes:
logger (logging.Logger): Logger for capturing relevant events and errors.
valid (bool): Indicates whether clusters were successfully identified.
all_points (list): All points retrieved from OSM, representing locations of interest.
cluster_points (list): Points identified as part of a cluster.
cluster_labels (list): Labels corresponding to the clusters each point belongs to.
cluster_type (Literal['sightseeing', 'shopping']): Type of clustering, either for sightseeing
landmarks or shopping areas.
"""
logger = logging.getLogger(__name__)
# NOTE: all points are in (lat, lon) format
valid: bool # Ensure the manager is valid (ie there are some clusters to be found)
valid: bool # Ensure the manager is valid (ie there are some clusters to be found)
all_points: list
cluster_points: list
cluster_labels: list
@ -61,65 +78,79 @@ class ClusterManager:
Args:
bbox: The bounding box coordinates (around:radius, center_lat, center_lon).
"""
# Setup the caching in the Overpass class.
self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR)
# Initialize overpass and cache
self.overpass = Overpass()
CachingStrategy.use(JSON, cacheDir=OSM_CACHE_DIR)
self.cluster_type = cluster_type
if cluster_type == 'shopping' :
elem_type = ['node']
sel = ['"shop"~"^(bag|boutique|clothes)$"']
out = 'skel'
osm_types = ['node']
sel = '"shop"~"^(bag|boutique|clothes)$"'
out = 'ids center'
elif cluster_type == 'sightseeing' :
osm_types = ['way']
sel = '"historic"~"^(monument|building|yes)$"'
out = 'ids center'
else :
elem_type = ['way']
sel = ['"historic"="building"']
out = 'center'
raise NotImplementedError("Please choose only an available option for cluster detection")
# Initialize the points for cluster detection
query = overpassQueryBuilder(
bbox = bbox,
elementType = elem_type,
query = self.overpass.build_query(
area = bbox,
osm_types = osm_types,
selector = sel,
includeCenter = True,
out = out
)
self.logger.debug(f"Cluster query: {query}")
try:
result = self.overpass.query(query)
result = self.overpass.send_query(query)
except Exception as e:
self.logger.error(f"Error fetching landmarks: {e}")
if len(result.elements()) == 0 :
if result is None :
self.logger.error(f"Error fetching {cluster_type} clusters, overpass query returned None.")
self.valid = False
else :
points = []
for elem in result.elements() :
coords = tuple((elem.lat(), elem.lon()))
if coords[0] is None :
coords = tuple((elem.centerLat(), elem.centerLon()))
points.append(coords)
for osm_type in osm_types :
for elem in result.findall(osm_type):
# Get coordinates and append them to the points list
_, coords = get_base_info(elem, osm_type)
if coords is not None :
points.append(coords)
self.all_points = np.array(points)
self.valid = True
if points :
self.all_points = np.array(points)
# Apply DBSCAN to find clusters. Choose different settings for different cities.
if self.cluster_type == 'shopping' and len(self.all_points) > 200 :
dbscan = DBSCAN(eps=0.00118, min_samples=15, algorithm='kd_tree') # for large cities
elif self.cluster_type == 'sightseeing' :
dbscan = DBSCAN(eps=0.0025, min_samples=15, algorithm='kd_tree') # for historic neighborhoods
else :
dbscan = DBSCAN(eps=0.00075, min_samples=10, algorithm='kd_tree') # for small cities
labels = dbscan.fit_predict(self.all_points)
# Check that there are is least 1 cluster
if len(set(labels)) > 1 :
self.logger.debug(f"Found {len(set(labels))} different clusters.")
# Separate clustered points and noise points
self.cluster_points = self.all_points[labels != -1]
self.cluster_labels = labels[labels != -1]
self.filter_clusters() # ValueError here sometimes. I dont know why. # Filter the clusters to keep only the largest ones.
self.valid = True
else :
self.logger.debug(f"Detected 0 {cluster_type} clusters.")
self.valid = False
# Apply DBSCAN to find clusters. Choose different settings for different cities.
if self.cluster_type == 'shopping' and len(self.all_points) > 200 :
dbscan = DBSCAN(eps=0.00118, min_samples=15, algorithm='kd_tree') # for large cities
elif self.cluster_type == 'sightseeing' :
dbscan = DBSCAN(eps=0.0025, min_samples=15, algorithm='kd_tree') # for historic neighborhoods
else :
dbscan = DBSCAN(eps=0.00075, min_samples=10, algorithm='kd_tree') # for small cities
labels = dbscan.fit_predict(self.all_points)
# Separate clustered points and noise points
self.cluster_points = self.all_points[labels != -1]
self.cluster_labels = labels[labels != -1]
# filter the clusters to keep only the largest ones
self.filter_clusters()
self.logger.debug(f"Detected 0 {cluster_type} clusters.")
self.valid = False
def generate_clusters(self) -> list[Landmark]:
@ -147,7 +178,7 @@ class ClusterManager:
# Extract points belonging to the current cluster
current_cluster = self.cluster_points[self.cluster_labels == label]
# Calculate the centroid as the mean of the points
centroid = np.mean(current_cluster, axis=0)
@ -188,7 +219,7 @@ class ClusterManager:
# Define the bounding box for a given radius around the coordinates
lat, lon = cluster.centroid
bbox = ("around:1000", str(lat), str(lon))
bbox = (1000, lat, lon)
# Query neighborhoods and shopping malls
selectors = ['"place"~"^(suburb|neighborhood|neighbourhood|quarter|city_block)$"']
@ -197,60 +228,56 @@ class ClusterManager:
selectors.append('"shop"="mall"')
new_name = 'Shopping Area'
t = 40
else :
else :
new_name = 'Neighborhood'
t = 15
min_dist = float('inf')
new_name_en = None
osm_id = 0
osm_type = 'node'
osm_types = ['node', 'way', 'relation']
for sel in selectors :
query = overpassQueryBuilder(
bbox = bbox,
elementType = ['node', 'way', 'relation'],
for sel in selectors :
query = self.overpass.build_query(
area = bbox,
osm_types = osm_types,
selector = sel,
includeCenter = True,
out = 'center'
out = 'ids center'
)
try:
result = self.overpass.query(query)
result = self.overpass.send_query(query)
except Exception as e:
self.logger.error(f"Error fetching landmarks: {e}")
continue
for elem in result.elements():
location = (elem.centerLat(), elem.centerLon())
if result is None :
self.logger.error(f"Error fetching landmarks: {e}")
continue
if location[0] is None :
location = (elem.lat(), elem.lon())
if location[0] is None :
for osm_type in osm_types :
for elem in result.findall(osm_type):
id, coords, name = get_base_info(elem, osm_type, with_name=True)
if name is None or coords is None :
continue
d = get_distance(cluster.centroid, location)
if d < min_dist :
min_dist = d
new_name = elem.tag('name')
osm_type = elem.type() # Add type: 'way' or 'relation'
osm_id = elem.id() # Add OSM id
d = get_distance(cluster.centroid, coords)
if d < min_dist :
min_dist = d
new_name = name
osm_type = osm_type # Add type: 'way' or 'relation'
osm_id = id # Add OSM id
# Add english name if it exists
try :
new_name_en = elem.tag('name:en')
except:
pass
return Landmark(
name=new_name,
type=self.cluster_type,
location=cluster.centroid, # TODO: use the fact the we can also recognize streets.
location=cluster.centroid, # later: use the fact the we can also recognize streets.
attractiveness=cluster.importance,
n_tags=0,
osm_id=osm_id,
osm_type=osm_type,
name_en=new_name_en,
duration=t
)
@ -277,6 +304,5 @@ class ClusterManager:
filtered_cluster_labels.append(np.full((label_counts[label],), label)) # Replicate the label
# update the cluster points and labels with the filtered data
self.cluster_points = np.vstack(filtered_cluster_points)
self.cluster_points = np.vstack(filtered_cluster_points) # ValueError here
self.cluster_labels = np.concatenate(filtered_cluster_labels)

View File

@ -1,8 +1,10 @@
import yaml
"""Contains various helper functions to help with distance or score computations."""
from math import sin, cos, sqrt, atan2, radians
import yaml
from ..constants import OPTIMIZER_PARAMETERS_PATH
with OPTIMIZER_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
DETOUR_FACTOR = parameters['detour_factor']
@ -10,6 +12,7 @@ with OPTIMIZER_PARAMETERS_PATH.open('r') as f:
EARTH_RADIUS_KM = 6373
def get_time(p1: tuple[float, float], p2: tuple[float, float]) -> int:
"""
Calculate the time in minutes to travel from one location to another.
@ -21,25 +24,23 @@ def get_time(p1: tuple[float, float], p2: tuple[float, float]) -> int:
Returns:
int: Time to travel from p1 to p2 in minutes.
"""
# if p1 == p2:
# return 0
# else:
# Compute the distance in km along the surface of the Earth
# (assume spherical Earth)
# this is the haversine formula, stolen from stackoverflow
# in order to not use any external libraries
lat1, lon1 = radians(p1[0]), radians(p1[1])
lat2, lon2 = radians(p2[0]), radians(p2[1])
dlon = lon2 - lon1
dlat = lat2 - lat1
if p1 == p2:
return 0
else:
# Compute the distance in km along the surface of the Earth
# (assume spherical Earth)
# this is the haversine formula, stolen from stackoverflow
# in order to not use any external libraries
lat1, lon1 = radians(p1[0]), radians(p1[1])
lat2, lon2 = radians(p2[0]), radians(p2[1])
a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
distance = EARTH_RADIUS_KM * c
distance = EARTH_RADIUS_KM * c
# Consider the detour factor for average an average city
walk_distance = distance * DETOUR_FACTOR
@ -47,7 +48,7 @@ def get_time(p1: tuple[float, float], p2: tuple[float, float]) -> int:
# Time to walk this distance (in minutes)
walk_time = walk_distance / AVERAGE_WALKING_SPEED * 60
return round(walk_time)
return min(round(walk_time), 32765)
def get_distance(p1: tuple[float, float], p2: tuple[float, float]) -> int:
@ -61,22 +62,19 @@ def get_distance(p1: tuple[float, float], p2: tuple[float, float]) -> int:
Returns:
int: Time to travel from p1 to p2 in minutes.
"""
if p1 == p2:
return 0
else:
# Compute the distance in km along the surface of the Earth
# (assume spherical Earth)
# this is the haversine formula, stolen from stackoverflow
# in order to not use any external libraries
lat1, lon1 = radians(p1[0]), radians(p1[1])
lat2, lon2 = radians(p2[0]), radians(p2[1])
# Compute the distance in km along the surface of the Earth
# (assume spherical Earth)
# this is the haversine formula, stolen from stackoverflow
# in order to not use any external libraries
lat1, lon1 = radians(p1[0]), radians(p1[1])
lat2, lon2 = radians(p2[0]), radians(p2[1])
dlon = lon2 - lon1
dlat = lat2 - lat1
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
return EARTH_RADIUS_KM * c
return EARTH_RADIUS_KM * c

View File

@ -1,27 +1,33 @@
import math, yaml, logging
from OSMPythonTools.overpass import Overpass, overpassQueryBuilder
from OSMPythonTools.cachingStrategy import CachingStrategy, JSON
"""Module used to import data from OSM and arrange them in categories."""
import logging
import xml.etree.ElementTree as ET
import yaml
from ..structs.preferences import Preferences
from ..structs.landmark import Landmark
from .take_most_important import take_most_important
from .cluster_manager import ClusterManager
from ..overpass.overpass import Overpass, get_base_info
from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH, OSM_CACHE_DIR
# silence the overpass logger
logging.getLogger('OSMPythonTools').setLevel(level=logging.CRITICAL)
logging.getLogger('Overpass').setLevel(level=logging.CRITICAL)
class LandmarkManager:
"""
Use this to manage landmarks.
Uses the overpass api to fetch landmarks and classify them.
"""
logger = logging.getLogger(__name__)
radius_close_to: int # radius in meters
church_coeff: float # coeff to adjsut score of churches
nature_coeff: float # coeff to adjust score of parks
overall_coeff: float # coeff to adjust weight of tags
N_important: int # number of important landmarks to consider
n_important: int # number of important landmarks to consider
def __init__(self) -> None:
@ -42,15 +48,17 @@ class LandmarkManager:
self.wikipedia_bonus = parameters['wikipedia_bonus']
self.viewpoint_bonus = parameters['viewpoint_bonus']
self.pay_bonus = parameters['pay_bonus']
self.N_important = parameters['N_important']
self.n_important = parameters['N_important']
with OPTIMIZER_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
self.walking_speed = parameters['average_walking_speed']
self.detour_factor = parameters['detour_factor']
self.overpass = Overpass()
CachingStrategy.use(JSON, cacheDir=OSM_CACHE_DIR)
# Setup the caching in the Overpass class.
self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR)
self.logger.info('LandmakManager successfully initialized.')
def generate_landmarks_list(self, center_coordinates: tuple[float, float], preferences: Preferences) -> tuple[list[Landmark], list[Landmark]]:
@ -70,120 +78,89 @@ class LandmarkManager:
- A list of all existing landmarks.
- A list of the most important landmarks based on the user's preferences.
"""
max_walk_dist = (preferences.max_time_minute/2)/60*self.walking_speed*1000/self.detour_factor
self.logger.debug('Starting to fetch landmarks...')
max_walk_dist = int((preferences.max_time_minute/2)/60*self.walking_speed*1000/self.detour_factor)
reachable_bbox_side = min(max_walk_dist, self.max_bbox_side)
# use set to avoid duplicates, this requires some __methods__ to be set in Landmark
all_landmarks = set()
# Create a bbox using the around technique
bbox = tuple((f"around:{reachable_bbox_side/2}", str(center_coordinates[0]), str(center_coordinates[1])))
# Create a bbox using the around technique, tuple of strings
bbox = tuple((min(2000, reachable_bbox_side/2), center_coordinates[0], center_coordinates[1]))
# list for sightseeing
if preferences.sightseeing.score != 0:
score_function = lambda score: score * 10 * preferences.sightseeing.score / 5
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['sightseeing'], preferences.sightseeing.type, score_function)
self.logger.debug('Fetching sightseeing landmarks...')
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['sightseeing'], preferences.sightseeing.type, preferences.sightseeing.score)
all_landmarks.update(current_landmarks)
self.logger.debug('Fetching sightseeing clusters...')
# special pipeline for historic neighborhoods
neighborhood_manager = ClusterManager(bbox, 'sightseeing')
historic_clusters = neighborhood_manager.generate_clusters()
all_landmarks.update(historic_clusters)
self.logger.debug('Sightseeing clusters done')
# list for nature
if preferences.nature.score != 0:
score_function = lambda score: score * 10 * self.nature_coeff * preferences.nature.score / 5
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['nature'], preferences.nature.type, score_function)
self.logger.debug('Fetching nature landmarks...')
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['nature'], preferences.nature.type, preferences.nature.score)
all_landmarks.update(current_landmarks)
# list for shopping
if preferences.shopping.score != 0:
score_function = lambda score: score * 10 * preferences.shopping.score / 5
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['shopping'], preferences.shopping.type, score_function)
self.logger.debug('Fetching shopping landmarks...')
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['shopping'], preferences.shopping.type, preferences.shopping.score)
self.logger.debug('Fetching shopping clusters...')
# set time for all shopping activites :
for landmark in current_landmarks : landmark.duration = 30
for landmark in current_landmarks :
landmark.duration = 30
all_landmarks.update(current_landmarks)
# special pipeline for shopping malls
shopping_manager = ClusterManager(bbox, 'shopping')
shopping_clusters = shopping_manager.generate_clusters()
all_landmarks.update(shopping_clusters)
self.logger.debug('Shopping clusters done')
landmarks_constrained = take_most_important(all_landmarks, self.N_important)
self.logger.info(f'Generated {len(all_landmarks)} landmarks around {center_coordinates}, and constrained to {len(landmarks_constrained)} most important ones.')
landmarks_constrained = take_most_important(all_landmarks, self.n_important)
# self.logger.info(f'All landmarks generated : {len(all_landmarks)} landmarks around {center_coordinates}, and constrained to {len(landmarks_constrained)} most important ones.')
return all_landmarks, landmarks_constrained
def count_elements_close_to(self, coordinates: tuple[float, float]) -> int:
def set_landmark_score(self, landmark: Landmark, landmarktype: str, preference_level: int) :
"""
Count the number of OpenStreetMap elements (nodes, ways, relations) within a specified radius of the given location.
Calculate and set the attractiveness score for a given landmark.
This function constructs a bounding box around the specified coordinates based on the radius. It then queries
OpenStreetMap data to count the number of elements within that bounding box.
This method evaluates the landmark's attractiveness based on its properties
(number of tags, presence of Wikipedia URL, image, website, and whether it's
a place of worship) and adjusts the score using the user's preference level.
Args:
coordinates (tuple[float, float]): The latitude and longitude of the location to search around.
Returns:
int: The number of elements (nodes, ways, relations) within the specified radius. Returns 0 if no elements
are found or if an error occurs during the query.
landmark (Landmark): The landmark object to score.
landmarktype (str): The type of the landmark (currently unused).
preference_level (int): The user's preference level for this landmark type.
"""
lat = coordinates[0]
lon = coordinates[1]
score = landmark.n_tags**self.tag_exponent
if landmark.wiki_url :
score *= self.wikipedia_bonus
if landmark.image_url :
score *= self.image_bonus
if landmark.website_url :
score *= self.wikipedia_bonus
if landmark.is_place_of_worship :
score *= self.church_coeff
if landmarktype == 'nature' :
score *= self.nature_coeff
radius = self.radius_close_to
alpha = (180 * radius) / (6371000 * math.pi)
bbox = {'latLower':lat-alpha,'lonLower':lon-alpha,'latHigher':lat+alpha,'lonHigher': lon+alpha}
# Build the query to find elements within the radius
radius_query = overpassQueryBuilder(
bbox=[bbox['latLower'],
bbox['lonLower'],
bbox['latHigher'],
bbox['lonHigher']],
elementType=['node', 'way', 'relation']
)
try:
radius_result = self.overpass.query(radius_query)
N_elem = radius_result.countWays() + radius_result.countRelations()
self.logger.debug(f"There are {N_elem} ways/relations within 50m")
if N_elem is None:
return 0
return N_elem
except:
return 0
landmark.attractiveness = int(score * preference_level * 2)
# def create_bbox(self, coordinates: tuple[float, float], reachable_bbox_side: int) -> tuple[float, float, float, float]:
# """
# Create a bounding box around the given coordinates.
# Args:
# coordinates (tuple[float, float]): The latitude and longitude of the center of the bounding box.
# reachable_bbox_side (int): The side length of the bounding box in meters.
# Returns:
# tuple[float, float, float, float]: The minimum latitude, minimum longitude, maximum latitude, and maximum longitude
# defining the bounding box.
# """
# # Half the side length in m (since it's a square bbox)
# half_side_length_m = reachable_bbox_side / 2
# return tuple((f"around:{half_side_length_m}", str(coordinates[0]), str(coordinates[1])))
def fetch_landmarks(self, bbox: tuple, amenity_selector: dict, landmarktype: str, score_function: callable) -> list[Landmark]:
def fetch_landmarks(self, bbox: tuple, amenity_selector: dict, landmarktype: str, preference_level: int) -> list[Landmark]:
"""
Fetches landmarks of a specified type from OpenStreetMap (OSM) within a bounding box centered on given coordinates.
@ -210,171 +187,126 @@ class LandmarkManager:
# caution, when applying a list of selectors, overpass will search for elements that match ALL selectors simultaneously
# we need to split the selectors into separate queries and merge the results
for sel in dict_to_selector_list(amenity_selector):
self.logger.debug(f"Current selector: {sel}")
# self.logger.debug(f"Current selector: {sel}")
# query_conditions = ['count_tags()>5']
# if landmarktype == 'shopping' : # use this later for shopping clusters
# element_types = ['node']
element_types = ['way', 'relation']
osm_types = ['way', 'relation']
if 'viewpoint' in sel :
query_conditions = []
element_types.append('node')
osm_types.append('node')
query = overpassQueryBuilder(
bbox = bbox,
elementType = element_types,
# selector can in principle be a list already,
# but it generates the intersection of the queries
# we want the union
query = self.overpass.build_query(
area = bbox,
osm_types = osm_types,
selector = sel,
conditions = query_conditions, # except for nature....
includeCenter = True,
out = 'center'
)
self.logger.debug(f"Query: {query}")
try:
result = self.overpass.query(query)
result = self.overpass.send_query(query)
except Exception as e:
self.logger.error(f"Error fetching landmarks: {e}")
continue
for elem in result.elements():
return_list += self.xml_to_landmarks(result, landmarktype, preference_level)
name = elem.tag('name')
location = (elem.centerLat(), elem.centerLon())
osm_type = elem.type() # Add type: 'way' or 'relation'
osm_id = elem.id() # Add OSM id
# TODO: exclude these from the get go
# handle unprecise and no-name locations
if name is None or location[0] is None:
if osm_type == 'node' and 'viewpoint' in elem.tags().values():
name = 'Viewpoint'
name_en = 'Viewpoint'
location = (elem.lat(), elem.lon())
else :
continue
# skip if part of another building
if 'building:part' in elem.tags().keys() and elem.tag('building:part') == 'yes':
continue
elem_type = landmarktype # Add the landmark type as 'sightseeing,
n_tags = len(elem.tags().keys()) # Add number of tags
score = n_tags**self.tag_exponent # Add score
website_url = None
image_url = None
name_en = None
# Adjust scoring, browse through tag keys
skip = False
for tag_key in elem.tags().keys():
if "pay" in tag_key:
# payment options are misleading and should not count for the scoring.
score += self.pay_bonus
if "disused" in tag_key:
# skip disused amenities
skip = True
break
if "building:" in tag_key:
# do not count the building description as being particularly useful
n_tags -= 1
if "boundary" in tag_key:
# skip "areas" like administrative boundaries and stuff
skip = True
break
if "historic" in tag_key and elem.tag('historic') in ['manor', 'optical_telegraph', 'pound', 'shieling', 'wayside_cross']:
# skip useless amenities
skip = True
break
if "name" in tag_key :
score += self.name_bonus
if "wiki" in tag_key:
# wikipedia entries count more
score += self.wikipedia_bonus
if "image" in tag_key:
# images must count more
score += self.image_bonus
if elem_type != "nature":
if "leisure" in tag_key and elem.tag('leisure') == "park":
elem_type = "nature"
if landmarktype != "shopping":
if "shop" in tag_key:
skip = True
break
if tag_key == "building" and elem.tag('building') in ['retail', 'supermarket', 'parking']:
skip = True
break
# Extract image, website and english name
if tag_key in ['website', 'contact:website']:
website_url = elem.tag(tag_key)
if tag_key == 'image':
image_url = elem.tag('image')
if tag_key =='name:en':
name_en = elem.tag('name:en')
if skip:
continue
# Don't visit random apartments
if 'apartments' in elem.tags().values():
continue
score = score_function(score)
if "place_of_worship" in elem.tags().values() :
if "cathedral" not in elem.tags().values() :
score = score * self.church_coeff
duration = 5
else :
duration = 10
elif 'viewpoint' in elem.tags().values() :
# viewpoints must count more
score = score * self.viewpoint_bonus
duration = 10
elif "museum" in elem.tags().values() or "aquarium" in elem.tags().values() or "planetarium" in elem.tags().values():
duration = 60
else:
duration = 5
# finally create our own landmark object
landmark = Landmark(
name = name,
type = elem_type,
location = location,
osm_type = osm_type,
osm_id = osm_id,
attractiveness = int(score),
must_do = False,
n_tags = int(n_tags),
duration = int(duration),
name_en = name_en,
image_url = image_url,
website_url = website_url
)
return_list.append(landmark)
self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}")
return return_list
def xml_to_landmarks(self, root: ET.Element, landmarktype, preference_level) -> list[Landmark]:
"""
Parse the Overpass API result and extract landmarks.
This method processes the XML root element returned by the Overpass API and
extracts landmarks of types 'node', 'way', and 'relation'. It retrieves
relevant information such as name, coordinates, and tags, and converts them
into Landmark objects.
Args:
root (ET.Element): The root element of the XML response from Overpass API.
elem_type (str): The type of landmark (e.g., node, way, relation).
Returns:
list[Landmark]: A list of Landmark objects extracted from the XML data.
"""
if root is None :
return []
landmarks = []
for osm_type in ['node', 'way', 'relation'] :
for elem in root.findall(osm_type):
id, coords, name = get_base_info(elem, osm_type, with_name=True)
if name is None or coords is None :
continue
tags = elem.findall('tag')
# Convert this to Landmark object
landmark = Landmark(name=name,
type=landmarktype,
location=coords,
osm_id=id,
osm_type=osm_type,
attractiveness=0,
n_tags=len(tags))
# Browse through tags to add information to landmark.
for tag in tags:
key = tag.get('k')
value = tag.get('v')
# Skip this landmark if not suitable.
if key == 'building:part' and value == 'yes' :
break
if 'disused:' in key :
break
if 'boundary:' in key :
break
if 'shop' in key and landmarktype != 'shopping' :
break
# if value == 'apartments' :
# break
# Fill in the other attributes.
if key == 'image' :
landmark.image_url = value
if key == 'website' :
landmark.website_url = value
if key == 'place_of_worship' :
landmark.is_place_of_worship = True
if key == 'wikipedia' :
landmark.wiki_url = value
if key == 'name:en' :
landmark.name_en = value
if 'building:' in key or 'pay' in key :
landmark.n_tags -= 1
# Set the duration.
if value in ['museum', 'aquarium', 'planetarium'] :
landmark.duration = 60
elif value == 'viewpoint' :
landmark.is_viewpoint = True
landmark.duration = 10
elif value == 'cathedral' :
landmark.is_place_of_worship = False
landmark.duration = 10
else :
landmark.duration = 5
else:
self.set_landmark_score(landmark, landmarktype, preference_level)
landmarks.append(landmark)
continue
return landmarks
def dict_to_selector_list(d: dict) -> list:
"""
Convert a dictionary of key-value pairs to a list of Overpass query strings.
@ -387,10 +319,10 @@ def dict_to_selector_list(d: dict) -> list:
"""
return_list = []
for key, value in d.items():
if type(value) == list:
if isinstance(value, list):
val = '|'.join(value)
return_list.append(f'{key}~"^({val})$"')
elif type(value) == str and len(value) == 0:
elif isinstance(value, str) and len(value) == 0:
return_list.append(f'{key}')
else:
return_list.append(f'{key}={value}')

View File

@ -1,524 +0,0 @@
import yaml, logging
import numpy as np
from scipy.optimize import linprog
from collections import defaultdict, deque
from ..structs.landmark import Landmark
from .get_time_separation import get_time
from ..constants import OPTIMIZER_PARAMETERS_PATH
class Optimizer:
logger = logging.getLogger(__name__)
detour: int = None # accepted max detour time (in minutes)
detour_factor: float # detour factor of straight line vs real distance in cities
average_walking_speed: float # average walking speed of adult
max_landmarks: int # max number of landmarks to visit
overshoot: float # overshoot to allow maxtime to overflow. Optimizer is a bit restrictive
def __init__(self) :
# load parameters from file
with OPTIMIZER_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
self.detour_factor = parameters['detour_factor']
self.average_walking_speed = parameters['average_walking_speed']
self.max_landmarks = parameters['max_landmarks']
self.overshoot = parameters['overshoot']
# Prevent the use of a particular solution
def prevent_config(self, resx):
"""
Prevent the use of a particular solution by adding constraints to the optimization.
Args:
resx (list[float]): List of edge weights.
Returns:
tuple[list[int], list[int]]: A tuple containing a new row for constraint matrix and new value for upper bound vector.
"""
for i, elem in enumerate(resx):
resx[i] = round(elem)
N = len(resx) # Number of edges
L = int(np.sqrt(N)) # Number of landmarks
nonzeroind = np.nonzero(resx)[0] # the return is a little funky so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0].tolist()
vertices_visited = ind_a
vertices_visited.remove(0)
ones = [1]*L
h = [0]*N
for i in range(L) :
if i in vertices_visited :
h[i*L:i*L+L] = ones
return h, [len(vertices_visited)-1]
# Prevents the creation of the same circle (both directions)
def prevent_circle(self, circle_vertices: list, L: int) :
"""
Prevent circular paths by by adding constraints to the optimization.
Args:
circle_vertices (list): List of vertices forming a circle.
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: A tuple containing a new row for constraint matrix and new value for upper bound vector.
"""
l1 = [0]*L*L
l2 = [0]*L*L
for i, node in enumerate(circle_vertices[:-1]) :
next = circle_vertices[i+1]
l1[node*L + next] = 1
l2[next*L + node] = 1
s = circle_vertices[0]
g = circle_vertices[-1]
l1[g*L + s] = 1
l2[s*L + g] = 1
return np.vstack((l1, l2)), [0, 0]
def is_connected(self, resx) :
"""
Determine the order of visits and detect any circular paths in the given configuration.
Args:
resx (list): List of edge weights.
Returns:
tuple[list[int], Optional[list[list[int]]]]: A tuple containing the visit order and a list of any detected circles.
"""
# first round the results to have only 0-1 values
for i, elem in enumerate(resx):
resx[i] = round(elem)
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0].tolist()
ind_b = nonzero_tup[1].tolist()
# Step 1: Create a graph representation
graph = defaultdict(list)
for a, b in zip(ind_a, ind_b):
graph[a].append(b)
# Step 2: Function to perform BFS/DFS to extract journeys
def get_journey(start):
journey_nodes = []
visited = set()
stack = deque([start])
while stack:
node = stack.pop()
if node not in visited:
visited.add(node)
journey_nodes.append(node)
for neighbor in graph[node]:
if neighbor not in visited:
stack.append(neighbor)
return journey_nodes
# Step 3: Extract all journeys
all_journeys_nodes = []
visited_nodes = set()
for node in ind_a:
if node not in visited_nodes:
journey_nodes = get_journey(node)
all_journeys_nodes.append(journey_nodes)
visited_nodes.update(journey_nodes)
for l in all_journeys_nodes :
if 0 in l :
order = l
all_journeys_nodes.remove(l)
break
if len(all_journeys_nodes) == 0 :
return order, None
return order, all_journeys_nodes
def init_ub_dist(self, landmarks: list[Landmark], max_time: int):
"""
Initialize the objective function coefficients and inequality constraints for the optimization problem.
This function computes the distances between all landmarks and stores their attractiveness to maximize sightseeing.
The goal is to maximize the objective function subject to the constraints A*x < b and A_eq*x = b_eq.
Args:
landmarks (list[Landmark]): List of landmarks.
max_time (int): Maximum time of visit allowed.
Returns:
tuple[list[float], list[float], list[int]]: Objective function coefficients, inequality constraint coefficients, and the right-hand side of the inequality constraint.
"""
# Objective function coefficients. a*x1 + b*x2 + c*x3 + ...
c = []
# Coefficients of inequality constraints (left-hand side)
A_ub = []
for spot1 in landmarks :
dist_table = [0]*len(landmarks)
c.append(-spot1.attractiveness)
for j, spot2 in enumerate(landmarks) :
t = get_time(spot1.location, spot2.location) + spot1.duration
dist_table[j] = t
closest = sorted(dist_table)[:25]
for i, dist in enumerate(dist_table) :
if dist not in closest :
dist_table[i] = 32700
A_ub += dist_table
c = c*len(landmarks)
return c, A_ub, [max_time*self.overshoot]
def respect_number(self, L, max_landmarks: int):
"""
Generate constraints to ensure each landmark is visited only once and cap the total number of visited landmarks.
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
ones = [1]*L
zeros = [0]*L
A = ones + zeros*(L-1)
b = [1]
for i in range(L-1) :
h_new = zeros*i + ones + zeros*(L-1-i)
A = np.vstack((A, h_new))
b.append(1)
A = np.vstack((A, ones*L))
b.append(max_landmarks+1)
return A, b
# Constraint to not have d14 and d41 simultaneously. Does not prevent cyclic paths with more elements
def break_sym(self, L):
"""
Generate constraints to prevent simultaneous travel between two landmarks in both directions.
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
upper_ind = np.triu_indices(L,0,L)
up_ind_x = upper_ind[0]
up_ind_y = upper_ind[1]
A = [0]*L*L
b = [1]
for i, _ in enumerate(up_ind_x[1:]) :
l = [0]*L*L
if up_ind_x[i] != up_ind_y[i] :
l[up_ind_x[i]*L + up_ind_y[i]] = 1
l[up_ind_y[i]*L + up_ind_x[i]] = 1
A = np.vstack((A,l))
b.append(1)
return A, b
def init_eq_not_stay(self, L: int):
"""
Generate constraints to prevent staying in the same position (e.g., removing d11, d22, d33, etc.).
Args:
L (int): Number of landmarks.
Returns:
tuple[list[np.ndarray], list[int]]: Equality constraint coefficients and the right-hand side of the equality constraints.
"""
l = [0]*L*L
for i in range(L) :
for j in range(L) :
if j == i :
l[j + i*L] = 1
l = np.array(np.array(l), dtype=np.int8)
return [l], [0]
def respect_user_must_do(self, landmarks: list[Landmark]) :
"""
Generate constraints to ensure that landmarks marked as 'must_do' are included in the optimization.
Args:
landmarks (list[Landmark]): List of landmarks, where some are marked as 'must_do'.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
L = len(landmarks)
A = [0]*L*L
b = [0]
for i, elem in enumerate(landmarks[1:]) :
if elem.must_do is True and elem.name not in ['finish', 'start']:
l = [0]*L*L
l[i*L:i*L+L] = [1]*L # set mandatory departures from landmarks tagged as 'must_do'
A = np.vstack((A,l))
b.append(1)
return A, b
def respect_user_must_avoid(self, landmarks: list[Landmark]) :
"""
Generate constraints to ensure that landmarks marked as 'must_avoid' are skipped in the optimization.
Args:
landmarks (list[Landmark]): List of landmarks, where some are marked as 'must_avoid'.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
L = len(landmarks)
A = [0]*L*L
b = [0]
for i, elem in enumerate(landmarks[1:]) :
if elem.must_avoid is True and elem.name not in ['finish', 'start']:
l = [0]*L*L
l[i*L:i*L+L] = [1]*L
A = np.vstack((A,l))
b.append(0) # prevent departures from landmarks tagged as 'must_do'
return A, b
# Constraint to ensure start at start and finish at goal
def respect_start_finish(self, L: int):
"""
Generate constraints to ensure that the optimization starts at the designated start landmark and finishes at the goal landmark.
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
l_start = [1]*L + [0]*L*(L-1) # sets departures only for start (horizontal ones)
l_start[L-1] = 0 # prevents the jump from start to finish
l_goal = [0]*L*L # sets arrivals only for finish (vertical ones)
l_L = [0]*L*(L-1) + [1]*L # prevents arrivals at start and departures from goal
for k in range(L-1) : # sets only vertical ones for goal (go to)
l_L[k*L] = 1
if k != 0 :
l_goal[k*L+L-1] = 1
A = np.vstack((l_start, l_goal))
b = [1, 1]
A = np.vstack((A,l_L))
b.append(0)
return A, b
def respect_order(self, L: int):
"""
Generate constraints to tie the optimization problem together and prevent stacked ones, although this does not fully prevent circles.
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
A = [0]*L*L
b = [0]
for i in range(L-1) : # Prevent stacked ones
if i == 0 or i == L-1: # Don't touch start or finish
continue
else :
l = [0]*L
l[i] = -1
l = l*L
for j in range(L) :
l[i*L + j] = 1
A = np.vstack((A,l))
b.append(0)
return A, b
def link_list(self, order: list[int], landmarks: list[Landmark])->list[Landmark] :
"""
Compute the time to reach from each landmark to the next and create a list of landmarks with updated travel times.
Args:
order (list[int]): List of indices representing the order of landmarks to visit.
landmarks (list[Landmark]): List of all landmarks.
Returns:
list[Landmark]]: The updated linked list of landmarks with travel times
"""
L = []
j = 0
while j < len(order)-1 :
# get landmarks involved
elem = landmarks[order[j]]
next = landmarks[order[j+1]]
# get attributes
elem.time_to_reach_next = get_time(elem.location, next.location)
elem.must_do = True
elem.location = (round(elem.location[0], 5), round(elem.location[1], 5))
elem.next_uuid = next.uuid
L.append(elem)
j += 1
next.location = (round(next.location[0], 5), round(next.location[1], 5))
next.must_do = True
L.append(next)
return L
# Main optimization pipeline
def solve_optimization(
self,
max_time: int,
landmarks: list[Landmark],
max_landmarks: int = None
) -> list[Landmark]:
"""
Main optimization pipeline to solve the landmark visiting problem.
This method sets up and solves a linear programming problem with constraints to find an optimal tour of landmarks,
considering user-defined must-visit landmarks, start and finish points, and ensuring no cycles are present.
Args:
max_time (int): Maximum time allowed for the tour in minutes.
landmarks (list[Landmark]): List of landmarks to visit.
max_landmarks (int): Maximum number of landmarks visited
Returns:
list[Landmark]: The optimized tour of landmarks with updated travel times, or None if no valid solution is found.
"""
if max_landmarks is None :
max_landmarks = self.max_landmarks
L = len(landmarks)
# SET CONSTRAINTS FOR INEQUALITY
c, A_ub, b_ub = self.init_ub_dist(landmarks, max_time) # Add the distances from each landmark to the other
A, b = self.respect_number(L, max_landmarks) # Respect max number of visits (no more possible stops than landmarks).
A_ub = np.vstack((A_ub, A), dtype=np.int16)
b_ub += b
A, b = self.break_sym(L) # break the 'zig-zag' symmetry
A_ub = np.vstack((A_ub, A), dtype=np.int16)
b_ub += b
# SET CONSTRAINTS FOR EQUALITY
A_eq, b_eq = self.init_eq_not_stay(L) # Force solution not to stay in same place
A, b = self.respect_user_must_do(landmarks) # Check if there are user_defined must_see. Also takes care of start/goal
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
A, b = self.respect_user_must_avoid(landmarks) # Check if there are user_defined must_see. Also takes care of start/goal
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
A, b = self.respect_start_finish(L) # Force start and finish positions
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
A, b = self.respect_order(L) # Respect order of visit (only works when max_time is limiting factor)
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
# SET BOUNDS FOR DECISION VARIABLE (x can only be 0 or 1)
x_bounds = [(0, 1)]*L*L
# Solve linear programming problem
res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3)
# Raise error if no solution is found
if not res.success :
raise ArithmeticError("No solution could be found, the problem is overconstrained. Try with a longer trip (>30 minutes).")
# If there is a solution, we're good to go, just check for connectiveness
order, circles = self.is_connected(res.x)
#nodes, edges = is_connected(res.x)
i = 0
timeout = 80
while circles is not None and i < timeout:
A, b = self.prevent_config(res.x)
A_ub = np.vstack((A_ub, A))
b_ub += b
#A_ub, b_ub = prevent_circle(order, len(landmarks), A_ub, b_ub)
for circle in circles :
A, b = self.prevent_circle(circle, L)
A_eq = np.vstack((A_eq, A))
b_eq += b
res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3)
if not res.success :
raise ArithmeticError("Solving failed because of overconstrained problem")
return None
order, circles = self.is_connected(res.x)
#nodes, edges = is_connected(res.x)
if circles is None :
break
# print(i)
i += 1
if i == timeout :
raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.")
#sort the landmarks in the order of the solution
tour = [landmarks[i] for i in order]
self.logger.debug(f"Re-optimized {i} times, score: {int(-res.fun)}")
return tour

View File

@ -1,3 +1,4 @@
"""Helper function to return only the major landmarks from a large list."""
from ..structs.landmark import Landmark
def take_most_important(landmarks: list[Landmark], n_important) -> list[Landmark]:

View File

@ -1,16 +1,34 @@
import logging, yaml
from OSMPythonTools.overpass import Overpass, overpassQueryBuilder
from OSMPythonTools.cachingStrategy import CachingStrategy, JSON
"""Module for finding public toilets around given coordinates."""
import logging
import xml.etree.ElementTree as ET
from ..overpass.overpass import Overpass, get_base_info
from ..structs.landmark import Toilets
from ..constants import LANDMARK_PARAMETERS_PATH, OSM_CACHE_DIR
from ..constants import OSM_CACHE_DIR
# silence the overpass logger
logging.getLogger('OSMPythonTools').setLevel(level=logging.CRITICAL)
logging.getLogger('Overpass').setLevel(level=logging.CRITICAL)
class ToiletsManager:
"""
Manages the process of fetching and caching toilet information from
OpenStreetMap (OSM) based on a specified location and radius.
This class is responsible for:
- Fetching toilet data from OSM using Overpass API around a given set of
coordinates (latitude, longitude).
- Using a caching strategy to optimize requests by saving and retrieving
data from a local cache.
- Logging important events and errors related to data fetching.
Attributes:
logger (logging.Logger): Logger for the class to capture events.
location (tuple[float, float]): Latitude and longitude representing the
location to search around.
radius (int): The search radius in meters for finding nearby toilets.
overpass (Overpass): The Overpass API instance used to query OSM.
"""
logger = logging.getLogger(__name__)
location: tuple[float, float]
@ -21,58 +39,87 @@ class ToiletsManager:
self.radius = radius
self.location = location
self.overpass = Overpass()
CachingStrategy.use(JSON, cacheDir=OSM_CACHE_DIR)
# Setup the caching in the Overpass class.
self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR)
def generate_toilet_list(self) -> list[Toilets] :
"""
Generates a list of toilet locations by fetching data from OpenStreetMap (OSM)
around the given coordinates stored in `self.location`.
# Create a bbox using the around technique
bbox = tuple((f"around:{self.radius}", str(self.location[0]), str(self.location[1])))
Returns:
list[Toilets]: A list of `Toilets` objects containing detailed information
about the toilets found around the given coordinates.
"""
bbox = tuple((self.radius, self.location[0], self.location[1]))
osm_types = ['node', 'way', 'relation']
toilets_list = []
query = overpassQueryBuilder(
bbox = bbox,
elementType = ['node', 'way', 'relation'],
# selector can in principle be a list already,
# but it generates the intersection of the queries
# we want the union
selector = ['"amenity"="toilets"'],
includeCenter = True,
out = 'center'
query = self.overpass.build_query(
area = bbox,
osm_types = osm_types,
selector = '"amenity"="toilets"',
out = 'ids center tags'
)
self.logger.debug(f"Query: {query}")
try:
result = self.overpass.query(query)
result = self.overpass.send_query(query)
except Exception as e:
self.logger.error(f"Error fetching landmarks: {e}")
return None
for elem in result.elements():
location = (elem.centerLat(), elem.centerLon())
# handle unprecise and no-name locations
if location[0] is None:
location = (elem.lat(), elem.lon())
else :
continue
toilets = Toilets(location=location)
if 'wheelchair' in elem.tags().keys() and elem.tag('wheelchair') == 'yes':
toilets.wheelchair = True
if 'changing_table' in elem.tags().keys() and elem.tag('changing_table') == 'yes':
toilets.changing_table = True
if 'fee' in elem.tags().keys() and elem.tag('fee') == 'yes':
toilets.fee = True
if 'opening_hours' in elem.tags().keys() :
toilets.opening_hours = elem.tag('opening_hours')
toilets_list.append(toilets)
toilets_list = self.xml_to_toilets(result)
return toilets_list
def xml_to_toilets(self, root: ET.Element) -> list[Toilets]:
"""
Parse the Overpass API result and extract landmarks.
This method processes the XML root element returned by the Overpass API and
extracts landmarks of types 'node', 'way', and 'relation'. It retrieves
relevant information such as name, coordinates, and tags, and converts them
into Landmark objects.
Args:
root (ET.Element): The root element of the XML response from Overpass API.
elem_type (str): The type of landmark (e.g., node, way, relation).
Returns:
list[Landmark]: A list of Landmark objects extracted from the XML data.
"""
if root is None :
return []
toilets_list = []
for osm_type in ['node', 'way', 'relation'] :
for elem in root.findall(osm_type):
# Get coordinates and append them to the points list
_, coords = get_base_info(elem, osm_type)
if coords is None :
continue
toilets = Toilets(location=coords)
# Extract tags as a dictionary
tags = {tag.get('k'): tag.get('v') for tag in elem.findall('tag')}
if 'wheelchair' in tags.keys() and tags['wheelchair'] == 'yes':
toilets.wheelchair = True
if 'changing_table' in tags.keys() and tags['changing_table'] == 'yes':
toilets.changing_table = True
if 'fee' in tags.keys() and tags['fee'] == 'yes':
toilets.fee = True
if 'opening_hours' in tags.keys() :
toilets.opening_hours = tags['opening_hours']
toilets_list.append(toilets)
return toilets_list