cluster recognition added to backend pipeline
Some checks failed
Build and deploy the backend to staging / Build and push image (pull_request) Successful in 3m0s
Run linting on the backend code / Build (pull_request) Failing after 29s
Run testing on the backend code / Build (pull_request) Failing after 2m9s
Build and deploy the backend to staging / Deploy to staging (pull_request) Successful in 15s

This commit is contained in:
Helldragon67 2024-12-04 18:34:43 +01:00
parent d9be7b0707
commit 7f77ecab04
4 changed files with 206 additions and 124 deletions

File diff suppressed because one or more lines are too long

View File

@ -78,6 +78,36 @@ def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
assert 136200148 in osm_ids # check for Cathédrale St. Jean in trip
def test_shopping(client, request) : # pylint: disable=redefined-outer-name
"""
Test n°3 : Custom test in Lyon centre to ensure shopping clusters are found.
Args:
client:
request:
"""
duration_minutes = 600
response = client.post(
"/trip/new",
json={
"preferences": {"sightseeing": {"type": "sightseeing", "score": 0},
"nature": {"type": "nature", "score": 0},
"shopping": {"type": "shopping", "score": 5},
"max_time_minute": duration_minutes,
"detour_tolerance_minute": 0},
"start": [45.7576485, 4.8330241]
}
)
result = response.json()
landmarks = load_trip_landmarks(client, result['first_landmark_uuid'])
osm_ids = landmarks_to_osmid(landmarks)
# Add details to report
log_trip_details(request, landmarks, result['total_time'], duration_minutes)
# checks :
assert response.status_code == 200 # check for successful planning
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
# def test_new_trip_single_prefs(client):
# response = client.post(

View File

@ -1,165 +1,208 @@
import logging, yaml
from typing import Optional, Literal
import logging
from typing import Literal
import numpy as np
from sklearn.cluster import DBSCAN
from sklearn.decomposition import PCA
from pydantic import BaseModel
from OSMPythonTools.overpass import Overpass, overpassQueryBuilder
from OSMPythonTools.cachingStrategy import CachingStrategy, JSON
from ..structs.landmark import Landmark
from ..structs.preferences import Preferences
from ..utils.get_time_separation import get_distance
from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH, OSM_CACHE_DIR
class ShoppingLocation(BaseModel):
type: Literal['street', 'area']
importance: int
centroid: tuple
start: Optional[list] = None
end: Optional[list] = None
# start: Optional[list] = None # for later use if we want to have streets as well
# end: Optional[list] = None
class ShoppingManager:
logger = logging.getLogger(__name__)
# NOTE: all points are in (lat, lon) format
valid: bool # Ensure the manager is valid (ie there are some clusters to be found)
all_points: list
cluster_points: list
cluster_labels: list
shopping_locations: list[ShoppingLocation]
def __init__(self) -> None:
with OPTIMIZER_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
self.walking_speed = parameters['average_walking_speed']
self.detour_factor = parameters['detour_factor']
def __init__(self, bbox: tuple) -> None:
"""
Upon intialization, generate the list of shops used for cluster points.
"""
# Initialize overpass and cache
self.overpass = Overpass()
CachingStrategy.use(JSON, cacheDir=OSM_CACHE_DIR)
# Initialize the points for cluster detection
query = overpassQueryBuilder(
bbox = bbox,
elementType = ['node'],
selector = ['"shop"~"^(bag|boutique|clothes)$"'],
includeCenter = True,
out = 'skel'
)
def generate_landmarks_list(self, center_coordinates: tuple[float, float], preferences: Preferences) :
try:
result = self.overpass.query(query)
except Exception as e:
self.logger.error(f"Error fetching landmarks: {e}")
max_walk_dist = (preferences.max_time_minute/2)/60*self.walking_speed*1000/self.detour_factor
reachable_bbox_side = min(max_walk_dist, self.max_bbox_side)
if len(result.elements()) > 0 :
# use set to avoid duplicates, this requires some __methods__ to be set in Landmark
shopping_landmarks = set()
points = []
for elem in result.elements() :
points.append(tuple((elem.lat(), elem.lon())))
# Create a bbox using the around technique.
bbox = tuple((f"around:{reachable_bbox_side/2}", str(center_coordinates[0]), str(center_coordinates[1])))
# list for sightseeing
self.all_points = np.array(points)
self.valid = True
else :
self.valid = False
def get_clusters(points: list) -> tuple:
"""
Apply DBSCAN to find clusters.
"""
if len(points) > 400 :
dbscan = DBSCAN(eps=0.00118, min_samples=15, algorithm='kd_tree') # for large cities
else :
dbscan = DBSCAN(eps=0.00075, min_samples=10, algorithm='kd_tree') # for small cities
def generate_shopping_landmarks(self) -> list[Landmark]:
labels = dbscan.fit_predict(points)
# First generate the clusters
self.generate_clusters()
# Separate clustered points and noise points
clustered_points = points[labels != -1]
clustered_labels = labels[labels != -1]
# Then generate the shopping locations
self.generate_shopping_locations()
return clustered_points, clustered_labels
# Transform the locations in landmarks and return the list
shopping_landmarks = []
for location in self.shopping_locations :
shopping_landmarks.append(self.create_landmark(location))
return shopping_landmarks
def filter_clusters(cluster_points, cluster_labels):
"""
Remove clusters of less importance.
"""
label_counts = np.bincount(cluster_labels)
def generate_clusters(self) :
# Step 3: Get the indices (labels) of the 5 largest clusters
top_5_labels = np.argsort(label_counts)[-5:] # Get the largest 5 clusters
# Step 4: Filter points to keep only the points in the top 5 clusters
filtered_cluster_points = []
filtered_cluster_labels = []
for label in top_5_labels:
filtered_cluster_points.append(cluster_points[cluster_labels == label])
filtered_cluster_labels.append(np.full((label_counts[label],), label)) # Replicate the label
# Concatenate filtered clusters into a single array
return np.vstack(filtered_cluster_points), np.concatenate(filtered_cluster_labels)
def fit_lines(points, labels):
"""
Fit lines to identified clusters.
"""
all_x = []
all_y = []
lines = []
locations = []
for label in set(labels):
cluster_points = points[labels == label]
# If there's not enough points, skip
if len(cluster_points) < 2:
continue
# Apply PCA to find the principal component (i.e., the line of best fit)
pca = PCA(n_components=1)
pca.fit(cluster_points)
direction = pca.components_[0]
centroid = pca.mean_
# Project the cluster points onto the principal direction (line direction)
projections = np.dot(cluster_points - centroid, direction)
# Get the range of the projections to find the approximate length of the cluster
cluster_length = projections.max() - projections.min()
# Now adjust `t` so that it scales with the cluster length
t = np.linspace(-cluster_length / 2.75, cluster_length / 2.75, 10)
# Calculate the start and end of the line based on min/max projections
start_point = centroid[0] + t*direction[0]
end_point = centroid[1] + t*direction[1]
# Store the line
lines.append((start_point, end_point))
# For visualization, store the points
all_x.append(min(start_point))
all_x.append(max(start_point))
all_y.append(min(end_point))
all_y.append(max(end_point))
if np.linalg.norm(t) <= 0.0045 :
loc = ShoppingLocation(
type='area',
centroid=tuple((centroid[1], centroid[0])),
importance = len(cluster_points),
)
# Apply DBSCAN to find clusters. Choose different settings for different cities.
if len(self.all_points) > 200 :
dbscan = DBSCAN(eps=0.00118, min_samples=15, algorithm='kd_tree') # for large cities
else :
loc = ShoppingLocation(
type='street',
centroid=tuple((centroid[1], centroid[0])),
importance = len(cluster_points),
start=start_point,
end=end_point
dbscan = DBSCAN(eps=0.00075, min_samples=10, algorithm='kd_tree') # for small cities
labels = dbscan.fit_predict(self.all_points)
# Separate clustered points and noise points
self.cluster_points = self.all_points[labels != -1]
self.cluster_labels = labels[labels != -1]
# filter the clusters to keep only the largest ones
self.filter_clusters()
def generate_shopping_locations(self) :
locations = []
# loop through the different clusters
for label in set(self.cluster_labels):
# Extract points belonging to the current cluster
current_cluster = self.cluster_points[self.cluster_labels == label]
# Calculate the centroid as the mean of the points
centroid = np.mean(current_cluster, axis=0)
locations.append(ShoppingLocation(
type='area',
centroid=centroid,
importance = len(current_cluster)
))
self.shopping_locations = locations
def create_landmark(self, shopping_location: ShoppingLocation) -> Landmark:
# Define the bounding box for a given radius around the coordinates
lat, lon = shopping_location.centroid
bbox = ("around:1000", str(lat), str(lon))
# Query neighborhoods and shopping malls
selectors = ['"place"~"^(suburb|neighborhood|neighbourhood|quarter|city_block)$"', '"shop"="mall"']
min_dist = float('inf')
new_name = 'Shopping Area'
new_name_en = None
osm_id = 0
osm_type = 'node'
for sel in selectors :
query = overpassQueryBuilder(
bbox = bbox,
elementType = ['node', 'way', 'relation'],
selector = sel,
includeCenter = True,
out = 'center'
)
locations.append(loc)
try:
result = self.overpass.query(query)
except Exception as e:
raise Exception("query unsuccessful")
xmin = min(all_x)
xmax = max(all_x)
ymin = min(all_y)
ymax = max(all_y)
corners = (xmin, xmax, ymin, ymax)
for elem in result.elements():
return corners, locations
location = (elem.centerLat(), elem.centerLon())
if location[0] is None :
location = (elem.lat(), elem.lon())
if location[0] is None :
continue
d = get_distance(shopping_location.centroid, location)
if d < min_dist :
min_dist = d
new_name = elem.tag('name')
osm_type = elem.type() # Add type: 'way' or 'relation'
osm_id = elem.id() # Add OSM id
# add english name if it exists
try :
new_name_en = elem.tag('name:en')
except:
pass
return Landmark(
name=new_name,
type='shopping',
location=shopping_location.centroid, # TODO: use the fact the we can also recognize streets.
attractiveness=shopping_location.importance,
n_tags=0,
osm_id=osm_id,
osm_type=osm_type,
name_en=new_name_en
)
def filter_clusters(self):
"""
Remove clusters of lesser importance.
"""
label_counts = np.bincount(self.cluster_labels)
# Step 3: Get the indices (labels) of the 5 largest clusters
top_5_labels = np.argsort(label_counts)[-5:] # Get the largest 5 clusters
# Step 4: Filter points to keep only the points in the top 5 clusters
filtered_cluster_points = []
filtered_cluster_labels = []
for label in top_5_labels:
filtered_cluster_points.append(self.cluster_points[self.cluster_labels == label])
filtered_cluster_labels.append(np.full((label_counts[label],), label)) # Replicate the label
# update the cluster points and labels with the filtered data
self.cluster_points = np.vstack(filtered_cluster_points)
self.cluster_labels = np.concatenate(filtered_cluster_labels)

View File

@ -5,6 +5,7 @@ from OSMPythonTools.cachingStrategy import CachingStrategy, JSON
from ..structs.preferences import Preferences
from ..structs.landmark import Landmark
from .take_most_important import take_most_important
from .cluster_processing import ShoppingManager
from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH, OSM_CACHE_DIR
@ -94,10 +95,19 @@ class LandmarkManager:
if preferences.shopping.score != 0:
score_function = lambda score: score * 10 * preferences.shopping.score / 5
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['shopping'], preferences.shopping.type, score_function)
# set time for all shopping activites :
for landmark in current_landmarks : landmark.duration = 30
all_landmarks.update(current_landmarks)
# special pipeline for shopping malls
shopping_manager = ShoppingManager(bbox)
if shopping_manager.valid :
shopping_clusters = shopping_manager.generate_shopping_landmarks()
for landmark in shopping_clusters : landmark.duration = 45
all_landmarks.update(shopping_clusters)
landmarks_constrained = take_most_important(all_landmarks, self.N_important)
self.logger.info(f'Generated {len(all_landmarks)} landmarks around {center_coordinates}, and constrained to {len(landmarks_constrained)} most important ones.')
@ -353,7 +363,6 @@ class LandmarkManager:
return return_list
def dict_to_selector_list(d: dict) -> list:
"""
Convert a dictionary of key-value pairs to a list of Overpass query strings.