Some checks failed
		
		
	
	Build and deploy the backend to staging / Build and push image (pull_request) Successful in 1m48s
				
			Run linting on the backend code / Build (pull_request) Successful in 27s
				
			Run testing on the backend code / Build (pull_request) Failing after 4m30s
				
			Build and deploy the backend to staging / Deploy to staging (pull_request) Successful in 22s
				
			
		
			
				
	
	
		
			309 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			309 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Find clusters of interest to add more general areas of visit to the tour."""
 | 
						|
import logging
 | 
						|
from typing import Literal
 | 
						|
 | 
						|
import numpy as np
 | 
						|
from sklearn.cluster import DBSCAN
 | 
						|
from pydantic import BaseModel
 | 
						|
 | 
						|
from ..overpass.overpass import Overpass, get_base_info
 | 
						|
from ..structs.landmark import Landmark
 | 
						|
from .get_time_distance import get_distance
 | 
						|
from ..constants import OSM_CACHE_DIR
 | 
						|
 | 
						|
 | 
						|
# silence the overpass logger
 | 
						|
logging.getLogger('Overpass').setLevel(level=logging.CRITICAL)
 | 
						|
 | 
						|
 | 
						|
class Cluster(BaseModel):
 | 
						|
    """"
 | 
						|
    A class representing an interesting area for shopping or sightseeing.
 | 
						|
    
 | 
						|
    It can represent either a general area or a specifc route with start and end point.
 | 
						|
    The importance represents the number of shops found in this cluster.
 | 
						|
    
 | 
						|
    Attributes:
 | 
						|
        type :       either a 'street' or 'area' (representing a denser field of shops).
 | 
						|
        importance : size of the cluster (number of points).
 | 
						|
        centroid :   center of the cluster.
 | 
						|
        start :      if the type is a street it goes from here...
 | 
						|
        end :        ...to here
 | 
						|
    """
 | 
						|
    type: Literal['street', 'area']
 | 
						|
    importance: int
 | 
						|
    centroid: tuple
 | 
						|
    # start: Optional[list] = None      # for later use if we want to have streets as well
 | 
						|
    # end: Optional[list] = None
 | 
						|
 | 
						|
 | 
						|
class ClusterManager:
 | 
						|
    """
 | 
						|
    A manager responsible for clustering points of interest, such as shops or historic sites, 
 | 
						|
    to identify areas worth visiting. It uses the DBSCAN algorithm to detect clusters 
 | 
						|
    based on a set of points retrieved from OpenStreetMap (OSM).
 | 
						|
 | 
						|
    Attributes:
 | 
						|
        logger (logging.Logger): Logger for capturing relevant events and errors.
 | 
						|
        valid (bool): Indicates whether clusters were successfully identified.
 | 
						|
        all_points (list): All points retrieved from OSM, representing locations of interest.
 | 
						|
        cluster_points (list): Points identified as part of a cluster.
 | 
						|
        cluster_labels (list): Labels corresponding to the clusters each point belongs to.
 | 
						|
        cluster_type (Literal['sightseeing', 'shopping']): Type of clustering, either for sightseeing 
 | 
						|
            landmarks or shopping areas.
 | 
						|
    """
 | 
						|
    logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
    # NOTE: all points are in (lat, lon) format
 | 
						|
    valid: bool             # Ensure the manager is valid (ie there are some clusters to be found)
 | 
						|
    all_points: list
 | 
						|
    cluster_points: list
 | 
						|
    cluster_labels: list
 | 
						|
    cluster_type: Literal['sightseeing', 'shopping']
 | 
						|
 | 
						|
    def __init__(self, bbox: tuple, cluster_type: Literal['sightseeing', 'shopping']) -> None:
 | 
						|
        """
 | 
						|
        Upon intialization, generate the point cloud used for cluster detection.
 | 
						|
        The points represent bag/clothes shops and general boutiques.
 | 
						|
        If the first step is successful, it applies the DBSCAN clustering algorithm with different
 | 
						|
        parameters depending on the size of the city (number of points). 
 | 
						|
        It filters out noise points and keeps only the largest clusters.
 | 
						|
 | 
						|
        A successful initialization updates:
 | 
						|
            - `self.cluster_points`: The points belonging to clusters.
 | 
						|
            - `self.cluster_labels`: The labels for the points in clusters.
 | 
						|
        
 | 
						|
        The method also calls `filter_clusters()` to retain only the largest clusters.
 | 
						|
 | 
						|
        Args: 
 | 
						|
            bbox: The bounding box coordinates (around:radius, center_lat, center_lon).
 | 
						|
        """
 | 
						|
        # Setup the caching in the Overpass class.
 | 
						|
        self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR)
 | 
						|
 | 
						|
 | 
						|
        self.cluster_type = cluster_type
 | 
						|
        if cluster_type == 'shopping' :
 | 
						|
            osm_types = ['node']
 | 
						|
            sel = '"shop"~"^(bag|boutique|clothes)$"'
 | 
						|
            out = 'ids center'
 | 
						|
        elif cluster_type == 'sightseeing' :
 | 
						|
            osm_types = ['way']
 | 
						|
            sel = '"historic"~"^(monument|building|yes)$"'
 | 
						|
            out = 'ids center'
 | 
						|
        else :
 | 
						|
            raise NotImplementedError("Please choose only an available option for cluster detection")
 | 
						|
 | 
						|
        # Initialize the points for cluster detection
 | 
						|
        query = self.overpass.build_query(
 | 
						|
            area = bbox,
 | 
						|
            osm_types = osm_types,
 | 
						|
            selector = sel,
 | 
						|
            out = out
 | 
						|
        )
 | 
						|
        self.logger.debug(f"Cluster query: {query}")
 | 
						|
 | 
						|
        try:
 | 
						|
            result = self.overpass.send_query(query)
 | 
						|
        except Exception as e:
 | 
						|
            self.logger.error(f"Error fetching landmarks: {e}")
 | 
						|
 | 
						|
        if result is None :
 | 
						|
            self.logger.error(f"Error fetching {cluster_type} clusters, overpass query returned None.")
 | 
						|
            self.valid = False
 | 
						|
 | 
						|
        else :
 | 
						|
            points = []
 | 
						|
            for osm_type in osm_types :
 | 
						|
                for elem in result.findall(osm_type):
 | 
						|
                    
 | 
						|
                    # Get coordinates and append them to the points list
 | 
						|
                    _, coords = get_base_info(elem, osm_type)
 | 
						|
                    if coords is not None :
 | 
						|
                        points.append(coords)
 | 
						|
 | 
						|
            if points :
 | 
						|
                self.all_points = np.array(points)
 | 
						|
 | 
						|
                # Apply DBSCAN to find clusters. Choose different settings for different cities.
 | 
						|
                if self.cluster_type == 'shopping' and len(self.all_points) > 200 :
 | 
						|
                    dbscan = DBSCAN(eps=0.00118, min_samples=15, algorithm='kd_tree')  # for large cities
 | 
						|
                elif self.cluster_type == 'sightseeing' :
 | 
						|
                    dbscan = DBSCAN(eps=0.0025, min_samples=15, algorithm='kd_tree')  # for historic neighborhoods
 | 
						|
                else :
 | 
						|
                    dbscan = DBSCAN(eps=0.00075, min_samples=10, algorithm='kd_tree')  # for small cities
 | 
						|
 | 
						|
                labels = dbscan.fit_predict(self.all_points)
 | 
						|
 | 
						|
                # Check that there are is least 1 cluster
 | 
						|
                if len(set(labels)) > 1 :
 | 
						|
                    self.logger.debug(f"Found {len(set(labels))} different clusters.")
 | 
						|
                    # Separate clustered points and noise points
 | 
						|
                    self.cluster_points = self.all_points[labels != -1]
 | 
						|
                    self.cluster_labels = labels[labels != -1]
 | 
						|
                    self.filter_clusters()      # ValueError here sometimes. I dont know why. # Filter the clusters to keep only the largest ones.
 | 
						|
                    self.valid = True
 | 
						|
 | 
						|
                else :
 | 
						|
                    self.logger.debug(f"Detected 0 {cluster_type} clusters.")
 | 
						|
                    self.valid = False
 | 
						|
 | 
						|
            else :
 | 
						|
                self.logger.debug(f"Detected 0 {cluster_type} clusters.")
 | 
						|
                self.valid = False
 | 
						|
 | 
						|
 | 
						|
    def generate_clusters(self) -> list[Landmark]:
 | 
						|
        """
 | 
						|
        Generate a list of landmarks based on identified clusters.
 | 
						|
 | 
						|
        This method iterates over the different clusters, calculates the centroid 
 | 
						|
        (as the mean of the points within each cluster), and assigns an importance 
 | 
						|
        based on the size of the cluster.
 | 
						|
 | 
						|
        The generated shopping locations are stored in `self.clusters` 
 | 
						|
        as a list of `Cluster` objects, each with:
 | 
						|
            - `type`: Set to 'area'.
 | 
						|
            - `centroid`: The calculated centroid of the cluster.
 | 
						|
            - `importance`: The number of points in the cluster.
 | 
						|
        """
 | 
						|
 | 
						|
        if not self.valid :
 | 
						|
            return []       # Return empty list if no clusters were found
 | 
						|
 | 
						|
        locations = []
 | 
						|
 | 
						|
        # loop through the different clusters
 | 
						|
        for label in set(self.cluster_labels):
 | 
						|
 | 
						|
            # Extract points belonging to the current cluster
 | 
						|
            current_cluster = self.cluster_points[self.cluster_labels == label]
 | 
						|
 | 
						|
            # Calculate the centroid as the mean of the points
 | 
						|
            centroid = np.mean(current_cluster, axis=0)
 | 
						|
 | 
						|
            if self.cluster_type == 'shopping' :
 | 
						|
                score = len(current_cluster)*2
 | 
						|
            else :
 | 
						|
                score = len(current_cluster)*8
 | 
						|
            locations.append(Cluster(
 | 
						|
                type='area',
 | 
						|
                centroid=centroid,
 | 
						|
                importance = score
 | 
						|
            ))
 | 
						|
 | 
						|
        # Transform the locations in landmarks and return the list
 | 
						|
        cluster_landmarks = []
 | 
						|
        for cluster in locations :
 | 
						|
            cluster_landmarks.append(self.create_landmark(cluster))
 | 
						|
 | 
						|
        return cluster_landmarks
 | 
						|
 | 
						|
 | 
						|
    def create_landmark(self, cluster: Cluster) -> Landmark:
 | 
						|
        """
 | 
						|
        Create a Landmark object based on the given shopping location.
 | 
						|
 | 
						|
        This method queries the Overpass API for nearby neighborhoods and shopping malls 
 | 
						|
        within a 1000m radius around the shopping location centroid. It selects the closest 
 | 
						|
        result and creates a landmark with the associated details such as name, type, and OSM ID.
 | 
						|
 | 
						|
        Parameters:
 | 
						|
            shopping_location (Cluster): A Cluster object containing 
 | 
						|
            the centroid and importance of the area.
 | 
						|
 | 
						|
        Returns:
 | 
						|
            Landmark: A Landmark object containing details such as the name, type, 
 | 
						|
            location, attractiveness, and OSM details.
 | 
						|
        """
 | 
						|
 | 
						|
        # Define the bounding box for a given radius around the coordinates
 | 
						|
        lat, lon = cluster.centroid
 | 
						|
        bbox = (1000, lat, lon)
 | 
						|
 | 
						|
        # Query neighborhoods and shopping malls
 | 
						|
        selectors = ['"place"~"^(suburb|neighborhood|neighbourhood|quarter|city_block)$"']
 | 
						|
 | 
						|
        if self.cluster_type == 'shopping' :
 | 
						|
            selectors.append('"shop"="mall"')
 | 
						|
            new_name = 'Shopping Area'
 | 
						|
            t = 40
 | 
						|
        else :
 | 
						|
            new_name = 'Neighborhood'
 | 
						|
            t = 15
 | 
						|
 | 
						|
        min_dist = float('inf')
 | 
						|
        osm_id = 0
 | 
						|
        osm_type = 'node'
 | 
						|
        osm_types = ['node', 'way', 'relation']
 | 
						|
 | 
						|
        for sel in selectors :
 | 
						|
            query = self.overpass.build_query(
 | 
						|
                area = bbox,
 | 
						|
                osm_types = osm_types,
 | 
						|
                selector = sel,
 | 
						|
                out = 'ids center'
 | 
						|
            )
 | 
						|
 | 
						|
            try:
 | 
						|
                result = self.overpass.send_query(query)
 | 
						|
            except Exception as e:
 | 
						|
                self.logger.error(f"Error fetching landmarks: {e}")
 | 
						|
                continue
 | 
						|
 | 
						|
            if result is None :
 | 
						|
                self.logger.error(f"Error fetching landmarks: {e}")
 | 
						|
                continue
 | 
						|
 | 
						|
            for osm_type in osm_types :
 | 
						|
                for elem in result.findall(osm_type):
 | 
						|
 | 
						|
                    id, coords, name = get_base_info(elem, osm_type, with_name=True)
 | 
						|
 | 
						|
                    if name is None or coords is None :
 | 
						|
                        continue
 | 
						|
 | 
						|
                    d = get_distance(cluster.centroid, coords)
 | 
						|
                    if  d < min_dist :
 | 
						|
                        min_dist = d
 | 
						|
                        new_name = name
 | 
						|
                        osm_type = osm_type     # Add type: 'way' or 'relation'
 | 
						|
                        osm_id = id             # Add OSM id
 | 
						|
 | 
						|
        return Landmark(
 | 
						|
            name=new_name,
 | 
						|
            type=self.cluster_type,
 | 
						|
            location=cluster.centroid,              # later: use the fact the we can also recognize streets.
 | 
						|
            attractiveness=cluster.importance,
 | 
						|
            n_tags=0,
 | 
						|
            osm_id=osm_id,
 | 
						|
            osm_type=osm_type,
 | 
						|
            duration=t
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
    def filter_clusters(self):
 | 
						|
        """
 | 
						|
        Filter clusters to retain only the 5 largest clusters by point count.
 | 
						|
 | 
						|
        This method calculates the size of each cluster and filters out all but the 
 | 
						|
        5 largest clusters. It then updates the cluster points and labels to reflect 
 | 
						|
        only those from the top 5 clusters.
 | 
						|
        """
 | 
						|
        label_counts = np.bincount(self.cluster_labels)
 | 
						|
 | 
						|
        # Step 3: Get the indices (labels) of the 5 largest clusters
 | 
						|
        top_5_labels = np.argsort(label_counts)[-5:]  # Get the largest 5 clusters
 | 
						|
 | 
						|
        # Step 4: Filter points to keep only the points in the top 5 clusters
 | 
						|
        filtered_cluster_points = []
 | 
						|
        filtered_cluster_labels = []
 | 
						|
 | 
						|
        for label in top_5_labels:
 | 
						|
            filtered_cluster_points.append(self.cluster_points[self.cluster_labels == label])
 | 
						|
            filtered_cluster_labels.append(np.full((label_counts[label],), label))  # Replicate the label
 | 
						|
 | 
						|
        # update the cluster points and labels with the filtered data
 | 
						|
        self.cluster_points = np.vstack(filtered_cluster_points)        # ValueError here
 | 
						|
        self.cluster_labels = np.concatenate(filtered_cluster_labels)
 |