# pylint: skip-file import numpy as np import json import os from typing import Optional, Literal from sklearn.cluster import DBSCAN from sklearn.decomposition import PCA import matplotlib.pyplot as plt from pydantic import BaseModel from OSMPythonTools.overpass import Overpass, overpassQueryBuilder from OSMPythonTools.cachingStrategy import CachingStrategy, JSON from math import sin, cos, sqrt, atan2, radians EARTH_RADIUS_KM = 6373 class ShoppingLocation(BaseModel): type: Literal['street', 'area'] importance: int centroid: tuple start: Optional[list] = None end: Optional[list] = None # Output to frontend class Landmark(BaseModel) : # Properties of the landmark name : str type: Literal['sightseeing', 'nature', 'shopping', 'start', 'finish'] location : tuple osm_type : str osm_id : int attractiveness : int n_tags : int image_url : Optional[str] = None website_url : Optional[str] = None description : Optional[str] = None # TODO future duration : Optional[int] = 0 name_en : Optional[str] = None # Additional properties depending on specific tour must_do : Optional[bool] = False must_avoid : Optional[bool] = False is_secondary : Optional[bool] = False time_to_reach_next : Optional[int] = 0 next_uuid : Optional[str] = None def extract_points(filestr: str) : """ Extract points from geojson file. Returns : np.array containing the points """ points = [] with open(os.path.dirname(__file__) + '/' + filestr, 'r') as f: geojson = json.load(f) for feature in geojson['features']: if feature['geometry']['type'] == 'Point': centroid = feature['geometry']['coordinates'] points.append(centroid) elif feature['geometry']['type'] == 'Polygon': centroid = np.array(feature['geometry']['coordinates'][0][0]) points.append(centroid) # Convert the list of points to a NumPy array return np.array(points) def get_distance(p1: tuple[float, float], p2: tuple[float, float]) -> int: """ Calculate the time in minutes to travel from one location to another. Args: p1 (tuple[float, float]): Coordinates of the starting location. p2 (tuple[float, float]): Coordinates of the destination. Returns: int: Time to travel from p1 to p2 in minutes. """ if p1 == p2: return 0 else: # Compute the distance in km along the surface of the Earth # (assume spherical Earth) # this is the haversine formula, stolen from stackoverflow # in order to not use any external libraries lat1, lon1 = radians(p1[0]), radians(p1[1]) lat2, lon2 = radians(p2[0]), radians(p2[1]) dlon = lon2 - lon1 dlat = lat2 - lat1 a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2 c = 2 * atan2(sqrt(a), sqrt(1 - a)) return EARTH_RADIUS_KM * c def filter_clusters(cluster_points, cluster_labels): """ Remove clusters of less importance. """ label_counts = np.bincount(cluster_labels) # Step 3: Get the indices (labels) of the 5 largest clusters top_5_labels = np.argsort(label_counts)[-5:] # Get the largest 5 clusters # Step 4: Filter points to keep only the points in the top 5 clusters filtered_cluster_points = [] filtered_cluster_labels = [] for label in top_5_labels: filtered_cluster_points.append(cluster_points[cluster_labels == label]) filtered_cluster_labels.append(np.full((label_counts[label],), label)) # Replicate the label # Concatenate filtered clusters into a single array return np.vstack(filtered_cluster_points), np.concatenate(filtered_cluster_labels) def fit_lines(points, labels): """ Fit lines to identified clusters. """ all_x = [] all_y = [] lines = [] locations = [] for label in set(labels): cluster_points = points[labels == label] # If there's not enough points, skip if len(cluster_points) < 2: continue # Apply PCA to find the principal component (i.e., the line of best fit) pca = PCA(n_components=1) pca.fit(cluster_points) direction = pca.components_[0] centroid = pca.mean_ # Project the cluster points onto the principal direction (line direction) projections = np.dot(cluster_points - centroid, direction) # Get the range of the projections to find the approximate length of the cluster cluster_length = projections.max() - projections.min() # Now adjust `t` so that it scales with the cluster length t = np.linspace(-cluster_length / 2.75, cluster_length / 2.75, 10) # Calculate the start and end of the line based on min/max projections start_point = centroid[0] + t*direction[0] end_point = centroid[1] + t*direction[1] # Store the line lines.append((start_point, end_point)) # For visualization, store the points all_x.append(min(start_point)) all_x.append(max(start_point)) all_y.append(min(end_point)) all_y.append(max(end_point)) if np.linalg.norm(t) <= 0.0045 : loc = ShoppingLocation( type='area', centroid=tuple((centroid[1], centroid[0])), importance = len(cluster_points), ) else : loc = ShoppingLocation( type='street', centroid=tuple((centroid[1], centroid[0])), importance = len(cluster_points), start=start_point, end=end_point ) locations.append(loc) xmin = min(all_x) xmax = max(all_x) ymin = min(all_y) ymax = max(all_y) corners = (xmin, xmax, ymin, ymax) return corners, locations def create_landmark(shopping_location: ShoppingLocation): # Define the bounding box for a given radius around the coordinates lat, lon = shopping_location.centroid bbox = ("around:1000", str(lat), str(lon)) overpass = Overpass() # CachingStrategy.use(JSON, cacheDir=OSM_CACHE_DIR) # Query neighborhoods and shopping malls selectors = ['"place"~"^(suburb|neighborhood|neighbourhood|quarter|city_block)$"', '"shop"="mall"'] min_dist = float('inf') new_name = 'Shopping Area' new_name_en = None osm_id = 0 osm_type = 'node' for sel in selectors : query = overpassQueryBuilder( bbox = bbox, elementType = ['node', 'way', 'relation'], selector = sel, includeCenter = True, out = 'center' ) try: result = overpass.query(query) except Exception as e: raise Exception("query unsuccessful") for elem in result.elements(): location = (elem.centerLat(), elem.centerLon()) if location[0] is None : location = (elem.lat(), elem.lon()) if location[0] is None : # print(f"Fetching coordinates failed with {elem.type()}/{elem.id()}") continue # print(f"Distance : {get_distance(shopping_location.centroid, location)}") d = get_distance(shopping_location.centroid, location) if d < min_dist : min_dist = d new_name = elem.tag('name') osm_type = elem.type() # Add type: 'way' or 'relation' osm_id = elem.id() # Add OSM id # add english name if it exists try : new_name_en = elem.tag('name:en') except: pass return Landmark( name=new_name, type='shopping', location=shopping_location.centroid, # TODO: use the fact the we can also recognize streets. attractiveness=shopping_location.importance, n_tags=0, osm_id=osm_id, osm_type=osm_type, name_en=new_name_en ) # Extract points points = extract_points('vienna_data.json') # print(len(points)) ######## Create a figure with 1 row and 3 columns for side-by-side plots fig, axes = plt.subplots(1, 3, figsize=(15, 5)) # Plot Raw data points axes[0].set_title('Raw Data') axes[0].scatter(points[:, 0], points[:, 1], color='blue', s=20) # Apply DBSCAN to find clusters. Choose different settings for different cities. if len(points) > 400 : dbscan = DBSCAN(eps=0.00118, min_samples=15, algorithm='kd_tree') # for large cities else : dbscan = DBSCAN(eps=0.00075, min_samples=10, algorithm='kd_tree') # for small cities labels = dbscan.fit_predict(points) # Separate clustered points and noise points clustered_points = points[labels != -1] clustered_labels = labels[labels != -1] noise_points = points[labels == -1] ######## Plot n°1: DBSCAN Clustering Results axes[1].set_title('DBSCAN Clusters') axes[1].scatter(clustered_points[:, 0], clustered_points[:, 1], c=clustered_labels, cmap='rainbow', s=20) axes[1].scatter(noise_points[:, 0], noise_points[:, 1], c='blue', s=7, label='Noise') # Keep the 5 biggest clusters clustered_points, clustered_labels = filter_clusters(clustered_points, clustered_labels) # Fit lines corners, locations = fit_lines(clustered_points, clustered_labels) (xmin, xmax, ymin, ymax) = corners ######## Plot clustered points in normal size and noise points separately axes[2].scatter(clustered_points[:, 0], clustered_points[:, 1], c=clustered_labels, cmap='rainbow', s=30) axes[2].set_title('PCA Fitted Lines on Clusters') # Create a list of Landmarks for the shopping things shopping_landmarks = [] for loc in locations : axes[2].scatter(loc.centroid[1], loc.centroid[0], color='red', marker='x', s=200, linewidth=3) landmark = create_landmark(loc) shopping_landmarks.append(landmark) axes[2].text(loc.centroid[1], loc.centroid[0], landmark.name, ha='center', va='top', fontsize=6, bbox=dict(facecolor='white', edgecolor='black', boxstyle='round,pad=0.2'), zorder=3) ####### Plot the detected lines in the final plot ####### # for loc in locations: # if loc.type == 'street' : # line_x = loc.start # line_y = loc.end # axes[2].plot(line_x, line_y, color='lime', linewidth=3) # else : axes[0].set_xlim(xmin-0.01, xmax+0.01) axes[0].set_ylim(ymin-0.01, ymax+0.01) axes[1].set_xlim(xmin-0.01, xmax+0.01) axes[1].set_ylim(ymin-0.01, ymax+0.01) axes[2].set_xlim(xmin-0.01, xmax+0.01) axes[2].set_ylim(ymin-0.01, ymax+0.01) print("\n\n\n") for landmark in shopping_landmarks : print(f"{landmark.name} is a shopping area with a score of {landmark.attractiveness}") plt.tight_layout() plt.show()