anyway/backend/src/utils/cluster_processing.py
Helldragon67 a4c435c398
Some checks failed
Build and deploy the backend to staging / Build and push image (pull_request) Successful in 2m30s
Run linting on the backend code / Build (pull_request) Failing after 28s
Run testing on the backend code / Build (pull_request) Failing after 1m37s
Build and deploy the backend to staging / Deploy to staging (pull_request) Successful in 16s
better pep8
2024-12-04 19:23:26 +01:00

284 lines
10 KiB
Python

import logging
from typing import Literal
import numpy as np
from sklearn.cluster import DBSCAN
from pydantic import BaseModel
from OSMPythonTools.overpass import Overpass, overpassQueryBuilder
from OSMPythonTools.cachingStrategy import CachingStrategy, JSON
from ..structs.landmark import Landmark
from ..utils.get_time_separation import get_distance
from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH, OSM_CACHE_DIR
class ShoppingLocation(BaseModel):
""""
A classe representing an interesting area for shopping.
It can represent either a general area or a specifc route with start and end point.
The importance represents the number of shops found in this cluster.
Attributes:
type : either a 'street' or 'area' (representing a denser field of shops).
importance : size of the cluster (number of points).
centroid : center of the cluster.
start : if the type is a street it goes from here...
end : ...to here
"""
type: Literal['street', 'area']
importance: int
centroid: tuple
# start: Optional[list] = None # for later use if we want to have streets as well
# end: Optional[list] = None
class ShoppingManager:
logger = logging.getLogger(__name__)
# NOTE: all points are in (lat, lon) format
valid: bool # Ensure the manager is valid (ie there are some clusters to be found)
all_points: list
cluster_points: list
cluster_labels: list
shopping_locations: list[ShoppingLocation]
def __init__(self, bbox: tuple) -> None:
"""
Upon intialization, generate the point cloud used for cluster detection.
The points represent bag/clothes shops and general boutiques.
Args:
bbox: The bounding box coordinates (around:radius, center_lat, center_lon).
"""
# Initialize overpass and cache
self.overpass = Overpass()
CachingStrategy.use(JSON, cacheDir=OSM_CACHE_DIR)
# Initialize the points for cluster detection
query = overpassQueryBuilder(
bbox = bbox,
elementType = ['node'],
selector = ['"shop"~"^(bag|boutique|clothes)$"'],
includeCenter = True,
out = 'skel'
)
try:
result = self.overpass.query(query)
except Exception as e:
self.logger.error(f"Error fetching landmarks: {e}")
if len(result.elements()) == 0 :
self.valid = False
else :
points = []
for elem in result.elements() :
points.append(tuple((elem.lat(), elem.lon())))
self.all_points = np.array(points)
self.valid = True
def generate_shopping_landmarks(self) -> list[Landmark]:
"""
Generate shopping landmarks based on clustered locations.
This method first generates clusters of locations and then extracts shopping-related
locations from these clusters. It transforms each shopping location into a `Landmark` object.
Returns:
list[Landmark]: A list of `Landmark` objects representing shopping locations.
Returns an empty list if no clusters are found.
"""
self.generate_clusters()
if len(set(self.cluster_labels)) == 0 :
return [] # Return empty list if no clusters were found
# Then generate the shopping locations
self.generate_shopping_locations()
# Transform the locations in landmarks and return the list
shopping_landmarks = []
for location in self.shopping_locations :
shopping_landmarks.append(self.create_landmark(location))
return shopping_landmarks
def generate_clusters(self) :
"""
Generate clusters of points using DBSCAN.
This method applies the DBSCAN clustering algorithm with different
parameters depending on the size of the city (number of points).
It filters out noise points and keeps only the largest clusters.
The method updates:
- `self.cluster_points`: The points belonging to clusters.
- `self.cluster_labels`: The labels for the points in clusters.
The method also calls `filter_clusters()` to retain only the largest clusters.
"""
# Apply DBSCAN to find clusters. Choose different settings for different cities.
if len(self.all_points) > 200 :
dbscan = DBSCAN(eps=0.00118, min_samples=15, algorithm='kd_tree') # for large cities
else :
dbscan = DBSCAN(eps=0.00075, min_samples=10, algorithm='kd_tree') # for small cities
labels = dbscan.fit_predict(self.all_points)
# Separate clustered points and noise points
self.cluster_points = self.all_points[labels != -1]
self.cluster_labels = labels[labels != -1]
# filter the clusters to keep only the largest ones
self.filter_clusters()
def generate_shopping_locations(self) :
"""
Generate shopping locations based on clustered points.
This method iterates over the different clusters, calculates the centroid
(as the mean of the points within each cluster), and assigns an importance
based on the size of the cluster.
The generated shopping locations are stored in `self.shopping_locations`
as a list of `ShoppingLocation` objects, each with:
- `type`: Set to 'area'.
- `centroid`: The calculated centroid of the cluster.
- `importance`: The number of points in the cluster.
"""
locations = []
# loop through the different clusters
for label in set(self.cluster_labels):
# Extract points belonging to the current cluster
current_cluster = self.cluster_points[self.cluster_labels == label]
# Calculate the centroid as the mean of the points
centroid = np.mean(current_cluster, axis=0)
locations.append(ShoppingLocation(
type='area',
centroid=centroid,
importance = len(current_cluster)
))
self.shopping_locations = locations
def create_landmark(self, shopping_location: ShoppingLocation) -> Landmark:
"""
Create a Landmark object based on the given shopping location.
This method queries the Overpass API for nearby neighborhoods and shopping malls
within a 1000m radius around the shopping location centroid. It selects the closest
result and creates a landmark with the associated details such as name, type, and OSM ID.
Parameters:
shopping_location (ShoppingLocation): A ShoppingLocation object containing
the centroid and importance of the area.
Returns:
Landmark: A Landmark object containing details such as the name, type,
location, attractiveness, and OSM details.
"""
# Define the bounding box for a given radius around the coordinates
lat, lon = shopping_location.centroid
bbox = ("around:1000", str(lat), str(lon))
# Query neighborhoods and shopping malls
selectors = ['"place"~"^(suburb|neighborhood|neighbourhood|quarter|city_block)$"', '"shop"="mall"']
min_dist = float('inf')
new_name = 'Shopping Area'
new_name_en = None
osm_id = 0
osm_type = 'node'
for sel in selectors :
query = overpassQueryBuilder(
bbox = bbox,
elementType = ['node', 'way', 'relation'],
selector = sel,
includeCenter = True,
out = 'center'
)
try:
result = self.overpass.query(query)
except Exception as e:
self.logger.error(f"Error fetching landmarks: {e}")
continue
for elem in result.elements():
location = (elem.centerLat(), elem.centerLon())
if location[0] is None :
location = (elem.lat(), elem.lon())
if location[0] is None :
continue
d = get_distance(shopping_location.centroid, location)
if d < min_dist :
min_dist = d
new_name = elem.tag('name')
osm_type = elem.type() # Add type: 'way' or 'relation'
osm_id = elem.id() # Add OSM id
# Add english name if it exists
try :
new_name_en = elem.tag('name:en')
except:
pass
return Landmark(
name=new_name,
type='shopping',
location=shopping_location.centroid, # TODO: use the fact the we can also recognize streets.
attractiveness=shopping_location.importance,
n_tags=0,
osm_id=osm_id,
osm_type=osm_type,
name_en=new_name_en
)
def filter_clusters(self):
"""
Filter clusters to retain only the 5 largest clusters by point count.
This method calculates the size of each cluster and filters out all but the
5 largest clusters. It then updates the cluster points and labels to reflect
only those from the top 5 clusters.
"""
label_counts = np.bincount(self.cluster_labels)
# Step 3: Get the indices (labels) of the 5 largest clusters
top_5_labels = np.argsort(label_counts)[-5:] # Get the largest 5 clusters
# Step 4: Filter points to keep only the points in the top 5 clusters
filtered_cluster_points = []
filtered_cluster_labels = []
for label in top_5_labels:
filtered_cluster_points.append(self.cluster_points[self.cluster_labels == label])
filtered_cluster_labels.append(np.full((label_counts[label],), label)) # Replicate the label
# update the cluster points and labels with the filtered data
self.cluster_points = np.vstack(filtered_cluster_points)
self.cluster_labels = np.concatenate(filtered_cluster_labels)