anyway/backend/src/utils/cluster_manager.py
2025-01-28 11:52:07 +01:00

302 lines
12 KiB
Python

"""Find clusters of interest to add more general areas of visit to the tour."""
import logging
from typing import Literal
import numpy as np
from sklearn.cluster import DBSCAN
from pydantic import BaseModel
from ..overpass.overpass import Overpass, get_base_info
from ..structs.landmark import Landmark
from .get_time_distance import get_distance
from .utils import create_bbox
# silence the overpass logger
logging.getLogger('Overpass').setLevel(level=logging.CRITICAL)
class Cluster(BaseModel):
""""
A class representing an interesting area for shopping or sightseeing.
It can represent either a general area or a specifc route with start and end point.
The importance represents the number of shops found in this cluster.
Attributes:
type : either a 'street' or 'area' (representing a denser field of shops).
importance : size of the cluster (number of points).
centroid : center of the cluster.
start : if the type is a street it goes from here...
end : ...to here
"""
type: Literal['street', 'area']
importance: int
centroid: tuple
# start: Optional[list] = None # for later use if we want to have streets as well
# end: Optional[list] = None
class ClusterManager:
"""
A manager responsible for clustering points of interest, such as shops or historic sites,
to identify areas worth visiting. It uses the DBSCAN algorithm to detect clusters
based on a set of points retrieved from OpenStreetMap (OSM).
Attributes:
logger (logging.Logger): Logger for capturing relevant events and errors.
valid (bool): Indicates whether clusters were successfully identified.
all_points (list): All points retrieved from OSM, representing locations of interest.
cluster_points (list): Points identified as part of a cluster.
cluster_labels (list): Labels corresponding to the clusters each point belongs to.
cluster_type (Literal['sightseeing', 'shopping']): Type of clustering, either for sightseeing
landmarks or shopping areas.
"""
logger = logging.getLogger(__name__)
# NOTE: all points are in (lat, lon) format
valid: bool # Ensure the manager is valid (ie there are some clusters to be found)
all_points: list
cluster_points: list
cluster_labels: list
cluster_type: Literal['sightseeing', 'shopping']
def __init__(self, bbox: tuple, cluster_type: Literal['sightseeing', 'shopping']) -> None:
"""
Upon intialization, generate the point cloud used for cluster detection.
The points represent bag/clothes shops and general boutiques.
If the first step is successful, it applies the DBSCAN clustering algorithm with different
parameters depending on the size of the city (number of points).
It filters out noise points and keeps only the largest clusters.
A successful initialization updates:
- `self.cluster_points`: The points belonging to clusters.
- `self.cluster_labels`: The labels for the points in clusters.
The method also calls `filter_clusters()` to retain only the largest clusters.
Args:
bbox: The bounding box coordinates (around:radius, center_lat, center_lon).
"""
# Setup the caching in the Overpass class.
self.overpass = Overpass()
self.cluster_type = cluster_type
if cluster_type == 'shopping' :
osm_types = ['node']
sel = '"shop"~"^(bag|boutique|clothes)$"'
out = 'ids center'
elif cluster_type == 'sightseeing' :
osm_types = ['way']
sel = '"historic"~"^(monument|building|yes)$"'
out = 'ids center'
else :
raise NotImplementedError("Please choose only an available option for cluster detection")
# Initialize the points for cluster detection
try:
result = self.overpass.send_query(
bbox = bbox,
osm_types = osm_types,
selector = sel,
out = out
)
except Exception as e:
self.logger.error(f"Error fetching clusters: {e}")
if result is None :
self.logger.debug(f"Found no {cluster_type} clusters, overpass query returned no datapoints.")
self.valid = False
else :
points = []
for elem in result:
osm_type = elem.get('type')
# Get coordinates and append them to the points list
_, coords = get_base_info(elem, osm_type)
if coords is not None :
points.append(coords)
if points :
self.all_points = np.array(points)
# Apply DBSCAN to find clusters. Choose different settings for different cities.
if self.cluster_type == 'shopping' and len(self.all_points) > 200 :
dbscan = DBSCAN(eps=0.00118, min_samples=15, algorithm='kd_tree') # for large cities
elif self.cluster_type == 'sightseeing' :
dbscan = DBSCAN(eps=0.0025, min_samples=15, algorithm='kd_tree') # for historic neighborhoods
else :
dbscan = DBSCAN(eps=0.00075, min_samples=10, algorithm='kd_tree') # for small cities
labels = dbscan.fit_predict(self.all_points)
# Check that there are is least 1 cluster
if len(set(labels)) > 1 :
self.logger.info(f"Found {len(set(labels))} different {cluster_type} clusters.")
# Separate clustered points and noise points
self.cluster_points = self.all_points[labels != -1]
self.cluster_labels = labels[labels != -1]
self.filter_clusters() # ValueError here sometimes. I dont know why. # Filter the clusters to keep only the largest ones.
self.valid = True
else :
self.logger.info(f"Found 0 {cluster_type} clusters.")
self.valid = False
else :
self.logger.debug(f"Detected 0 {cluster_type} clusters.")
self.valid = False
def generate_clusters(self) -> list[Landmark]:
"""
Generate a list of landmarks based on identified clusters.
This method iterates over the different clusters, calculates the centroid
(as the mean of the points within each cluster), and assigns an importance
based on the size of the cluster.
The generated shopping locations are stored in `self.clusters`
as a list of `Cluster` objects, each with:
- `type`: Set to 'area'.
- `centroid`: The calculated centroid of the cluster.
- `importance`: The number of points in the cluster.
"""
if not self.valid :
return [] # Return empty list if no clusters were found
locations = []
# loop through the different clusters
for label in set(self.cluster_labels):
# Extract points belonging to the current cluster
current_cluster = self.cluster_points[self.cluster_labels == label]
# Calculate the centroid as the mean of the points
centroid = np.mean(current_cluster, axis=0)
if self.cluster_type == 'shopping' :
score = len(current_cluster)*2
else :
score = len(current_cluster)*8
locations.append(Cluster(
type='area',
centroid=centroid,
importance = score
))
# Transform the locations in landmarks and return the list
cluster_landmarks = []
for cluster in locations :
cluster_landmarks.append(self.create_landmark(cluster))
return cluster_landmarks
def create_landmark(self, cluster: Cluster) -> Landmark:
"""
Create a Landmark object based on the given shopping location.
This method queries the Overpass API for nearby neighborhoods and shopping malls
within a 1000m radius around the shopping location centroid. It selects the closest
result and creates a landmark with the associated details such as name, type, and OSM ID.
Parameters:
shopping_location (Cluster): A Cluster object containing
the centroid and importance of the area.
Returns:
Landmark: A Landmark object containing details such as the name, type,
location, attractiveness, and OSM details.
"""
# Define the bounding box for a given radius around the coordinates
bbox = create_bbox(cluster.centroid, 1000)
# Query neighborhoods and shopping malls
selectors = ['"place"~"^(suburb|neighborhood|neighbourhood|quarter|city_block)$"']
if self.cluster_type == 'shopping' :
selectors.append('"shop"="mall"')
new_name = 'Shopping Area'
t = 40
else :
new_name = 'Neighborhood'
t = 15
min_dist = float('inf')
osm_id = 0
osm_type = 'node'
osm_types = ['node', 'way', 'relation']
for sel in selectors :
try:
result = self.overpass.send_query(bbox = bbox,
osm_types = osm_types,
selector = sel,
out = 'ids center'
)
except Exception as e:
self.logger.error(f"Error fetching clusters: {e}")
continue
if result is None :
self.logger.error(f"Error fetching clusters: {e}")
continue
for elem in result:
osm_type = elem.get('type')
id, coords, name = get_base_info(elem, osm_type, with_name=True)
if name is None or coords is None :
continue
d = get_distance(cluster.centroid, coords)
if d < min_dist :
min_dist = d
new_name = name
osm_type = osm_type # Add type: 'way' or 'relation'
osm_id = id # Add OSM id
return Landmark(
name=new_name,
type=self.cluster_type,
location=cluster.centroid, # later: use the fact the we can also recognize streets.
attractiveness=cluster.importance,
n_tags=0,
osm_id=osm_id,
osm_type=osm_type,
duration=t
)
def filter_clusters(self):
"""
Filter clusters to retain only the 5 largest clusters by point count.
This method calculates the size of each cluster and filters out all but the
5 largest clusters. It then updates the cluster points and labels to reflect
only those from the top 5 clusters.
"""
label_counts = np.bincount(self.cluster_labels)
# Step 3: Get the indices (labels) of the 5 largest clusters
top_5_labels = np.argsort(label_counts)[-5:] # Get the largest 5 clusters
# Step 4: Filter points to keep only the points in the top 5 clusters
filtered_cluster_points = []
filtered_cluster_labels = []
for label in top_5_labels:
filtered_cluster_points.append(self.cluster_points[self.cluster_labels == label])
filtered_cluster_labels.append(np.full((label_counts[label],), label)) # Replicate the label
# update the cluster points and labels with the filtered data
self.cluster_points = np.vstack(filtered_cluster_points) # ValueError here
self.cluster_labels = np.concatenate(filtered_cluster_labels)