Adding features to find public toilets and shopping streets #41
| @@ -13,6 +13,7 @@ from math import sin, cos, sqrt, atan2, radians | ||||
|  | ||||
| EARTH_RADIUS_KM = 6373 | ||||
|  | ||||
|  | ||||
| class ShoppingLocation(BaseModel): | ||||
|     type: Literal['street', 'area'] | ||||
|     importance: int | ||||
| @@ -21,7 +22,6 @@ class ShoppingLocation(BaseModel): | ||||
|     end: Optional[list] = None | ||||
|  | ||||
|  | ||||
|  | ||||
| # Output to frontend | ||||
| class Landmark(BaseModel) : | ||||
|     # Properties of the landmark | ||||
| @@ -206,7 +206,7 @@ def create_landmark(shopping_location: ShoppingLocation): | ||||
|     # CachingStrategy.use(JSON, cacheDir=OSM_CACHE_DIR) | ||||
|  | ||||
|     # Query neighborhoods and shopping malls | ||||
|     selectors = ['"place"~"^(suburb|neighborhood|city_block)$"', '"shop"="mall"'] | ||||
|     selectors = ['"place"~"^(suburb|neighborhood|neighbourhood|quarter|city_block)$"', '"shop"="mall"'] | ||||
|  | ||||
|     min_dist = float('inf') | ||||
|     new_name = 'Shopping Area' | ||||
| @@ -220,22 +220,22 @@ def create_landmark(shopping_location: ShoppingLocation): | ||||
|             elementType = ['node', 'way', 'relation'], | ||||
|             selector = sel, | ||||
|             includeCenter = True, | ||||
|             out = 'body' | ||||
|             out = 'center' | ||||
|         ) | ||||
|  | ||||
|         try: | ||||
|             result = overpass.query(query) | ||||
|             # print(f'query OK with {len(result.elements())} elements') | ||||
|         except Exception as e: | ||||
|             raise Exception("query unsuccessful") | ||||
|  | ||||
|         for elem in result.elements(): | ||||
|  | ||||
|             location = (elem.lat(), elem.lon()) | ||||
|             location = (elem.centerLat(), elem.centerLon()) | ||||
|  | ||||
|             if location[0] is None :  | ||||
|                 location = (elem.centerLat(), elem.centerLon()) | ||||
|                 location = (elem.lat(), elem.lon()) | ||||
|                 if location[0] is None :  | ||||
|                     # print(f"Fetching coordinates failed with {elem.type()}/{elem.id()}") | ||||
|                     continue | ||||
|  | ||||
|             # print(f"Distance : {get_distance(shopping_location.centroid, location)}") | ||||
| @@ -246,8 +246,6 @@ def create_landmark(shopping_location: ShoppingLocation): | ||||
|                 osm_type = elem.type()              # Add type: 'way' or 'relation' | ||||
|                 osm_id = elem.id()                  # Add OSM id  | ||||
|  | ||||
|                 # print("closer thing found") | ||||
|  | ||||
|                 # add english name if it exists | ||||
|                 try : | ||||
|                     new_name_en = elem.tag('name:en') | ||||
| @@ -267,7 +265,7 @@ def create_landmark(shopping_location: ShoppingLocation): | ||||
|  | ||||
|  | ||||
| # Extract points | ||||
| points = extract_points('newyork_data.json') | ||||
| points = extract_points('vienna_data.json') | ||||
|  | ||||
| # print(len(points)) | ||||
|  | ||||
| @@ -311,9 +309,13 @@ axes[2].set_title('PCA Fitted Lines on Clusters') | ||||
| # Create a list of Landmarks for the shopping things | ||||
| shopping_landmarks = [] | ||||
| for loc in locations : | ||||
|     axes[2].scatter(loc.centroid[0], loc.centroid[1], color='lime', marker='x', s=200, linewidth=3) | ||||
|     axes[2].scatter(loc.centroid[1], loc.centroid[0], color='red', marker='x', s=200, linewidth=3) | ||||
|     landmark = create_landmark(loc) | ||||
|     shopping_landmarks.append(landmark) | ||||
|     axes[2].text(loc.centroid[1], loc.centroid[0], landmark.name,  | ||||
|              ha='center', va='top', fontsize=6,  | ||||
|              bbox=dict(facecolor='white', edgecolor='black', boxstyle='round,pad=0.2'), | ||||
|              zorder=3) | ||||
|      | ||||
|      | ||||
|  | ||||
|   | ||||
| @@ -0,0 +1,165 @@ | ||||
| import logging, yaml | ||||
| from typing import Optional, Literal | ||||
|  | ||||
| import numpy as np | ||||
| from sklearn.cluster import DBSCAN | ||||
| from sklearn.decomposition import PCA | ||||
| from pydantic import BaseModel | ||||
| from OSMPythonTools.overpass import Overpass, overpassQueryBuilder | ||||
| from OSMPythonTools.cachingStrategy import CachingStrategy, JSON | ||||
|  | ||||
| from ..structs.landmark import Landmark | ||||
| from ..structs.preferences import Preferences | ||||
| from ..utils.get_time_separation import get_distance | ||||
| from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH, OSM_CACHE_DIR | ||||
|  | ||||
|  | ||||
| class ShoppingLocation(BaseModel): | ||||
|     type: Literal['street', 'area'] | ||||
|     importance: int | ||||
|     centroid: tuple | ||||
|     start: Optional[list] = None | ||||
|     end: Optional[list] = None | ||||
|  | ||||
|  | ||||
| class ShoppingManager: | ||||
|  | ||||
|     logger = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
|     def __init__(self) -> None: | ||||
|  | ||||
|         with OPTIMIZER_PARAMETERS_PATH.open('r') as f: | ||||
|                 parameters = yaml.safe_load(f) | ||||
|                 self.walking_speed = parameters['average_walking_speed'] | ||||
|                 self.detour_factor = parameters['detour_factor'] | ||||
|  | ||||
|         self.overpass = Overpass() | ||||
|         CachingStrategy.use(JSON, cacheDir=OSM_CACHE_DIR) | ||||
|  | ||||
|  | ||||
|     def generate_landmarks_list(self, center_coordinates: tuple[float, float], preferences: Preferences) : | ||||
|  | ||||
|         max_walk_dist = (preferences.max_time_minute/2)/60*self.walking_speed*1000/self.detour_factor | ||||
|         reachable_bbox_side = min(max_walk_dist, self.max_bbox_side) | ||||
|  | ||||
|         # use set to avoid duplicates, this requires some __methods__ to be set in Landmark | ||||
|         shopping_landmarks = set() | ||||
|  | ||||
|         # Create a bbox using the around technique. | ||||
|         bbox = tuple((f"around:{reachable_bbox_side/2}", str(center_coordinates[0]), str(center_coordinates[1]))) | ||||
|         # list for sightseeing | ||||
|  | ||||
|  | ||||
| def get_clusters(points: list) -> tuple: | ||||
|     """ | ||||
|     Apply DBSCAN to find clusters. | ||||
|     """ | ||||
|     if len(points) > 400 : | ||||
|         dbscan = DBSCAN(eps=0.00118, min_samples=15, algorithm='kd_tree')  # for large cities | ||||
|     else : | ||||
|         dbscan = DBSCAN(eps=0.00075, min_samples=10, algorithm='kd_tree')  # for small cities | ||||
|  | ||||
|     labels = dbscan.fit_predict(points) | ||||
|  | ||||
|     # Separate clustered points and noise points | ||||
|     clustered_points = points[labels != -1] | ||||
|     clustered_labels = labels[labels != -1] | ||||
|  | ||||
|     return clustered_points, clustered_labels | ||||
|  | ||||
|  | ||||
|  | ||||
| def filter_clusters(cluster_points, cluster_labels): | ||||
|     """ | ||||
|     Remove clusters of less importance. | ||||
|     """ | ||||
|     label_counts = np.bincount(cluster_labels) | ||||
|  | ||||
|     # Step 3: Get the indices (labels) of the 5 largest clusters | ||||
|     top_5_labels = np.argsort(label_counts)[-5:]  # Get the largest 5 clusters | ||||
|  | ||||
|     # Step 4: Filter points to keep only the points in the top 5 clusters | ||||
|     filtered_cluster_points = [] | ||||
|     filtered_cluster_labels = [] | ||||
|  | ||||
|     for label in top_5_labels: | ||||
|         filtered_cluster_points.append(cluster_points[cluster_labels == label]) | ||||
|         filtered_cluster_labels.append(np.full((label_counts[label],), label))  # Replicate the label | ||||
|  | ||||
|     # Concatenate filtered clusters into a single array | ||||
|     return np.vstack(filtered_cluster_points), np.concatenate(filtered_cluster_labels) | ||||
|  | ||||
|  | ||||
| def fit_lines(points, labels): | ||||
|     """ | ||||
|     Fit lines to identified clusters. | ||||
|     """ | ||||
|     all_x = [] | ||||
|     all_y = [] | ||||
|     lines = [] | ||||
|     locations = [] | ||||
|  | ||||
|     for label in set(labels): | ||||
|         cluster_points = points[labels == label] | ||||
|  | ||||
|         # If there's not enough points, skip | ||||
|         if len(cluster_points) < 2: | ||||
|             continue | ||||
|  | ||||
|         # Apply PCA to find the principal component (i.e., the line of best fit) | ||||
|         pca = PCA(n_components=1) | ||||
|         pca.fit(cluster_points) | ||||
|  | ||||
|         direction = pca.components_[0] | ||||
|         centroid = pca.mean_ | ||||
|  | ||||
|         # Project the cluster points onto the principal direction (line direction) | ||||
|         projections = np.dot(cluster_points - centroid, direction) | ||||
|  | ||||
|         # Get the range of the projections to find the approximate length of the cluster | ||||
|         cluster_length = projections.max() - projections.min() | ||||
|  | ||||
|         # Now adjust `t` so that it scales with the cluster length | ||||
|         t = np.linspace(-cluster_length / 2.75, cluster_length / 2.75, 10) | ||||
|  | ||||
|         # Calculate the start and end of the line based on min/max projections | ||||
|         start_point = centroid[0] + t*direction[0] | ||||
|         end_point = centroid[1] + t*direction[1] | ||||
|          | ||||
|         # Store the line | ||||
|         lines.append((start_point, end_point)) | ||||
|  | ||||
|         # For visualization, store the points | ||||
|         all_x.append(min(start_point)) | ||||
|         all_x.append(max(start_point)) | ||||
|         all_y.append(min(end_point)) | ||||
|         all_y.append(max(end_point)) | ||||
|  | ||||
|         if np.linalg.norm(t) <= 0.0045 : | ||||
|             loc = ShoppingLocation( | ||||
|                 type='area', | ||||
|                 centroid=tuple((centroid[1], centroid[0])), | ||||
|                 importance = len(cluster_points), | ||||
|             ) | ||||
|         else : | ||||
|             loc = ShoppingLocation( | ||||
|                 type='street', | ||||
|                 centroid=tuple((centroid[1], centroid[0])), | ||||
|                 importance = len(cluster_points), | ||||
|                 start=start_point, | ||||
|                 end=end_point | ||||
|             ) | ||||
|  | ||||
|         locations.append(loc) | ||||
|  | ||||
|     xmin = min(all_x) | ||||
|     xmax = max(all_x) | ||||
|     ymin = min(all_y) | ||||
|     ymax = max(all_y) | ||||
|     corners = (xmin, xmax, ymin, ymax) | ||||
|  | ||||
|     return corners, locations | ||||
|  | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -1,7 +1,4 @@ | ||||
| import math | ||||
| import yaml | ||||
| import logging | ||||
|  | ||||
| import math, yaml, logging | ||||
| from OSMPythonTools.overpass import Overpass, overpassQueryBuilder | ||||
| from OSMPythonTools.cachingStrategy import CachingStrategy, JSON | ||||
|  | ||||
| @@ -79,7 +76,7 @@ class LandmarkManager: | ||||
|         # use set to avoid duplicates, this requires some __methods__ to be set in Landmark | ||||
|         all_landmarks = set() | ||||
|  | ||||
|         # Create a bbox using the around | ||||
|         # Create a bbox using the around technique | ||||
|         bbox = tuple((f"around:{reachable_bbox_side/2}", str(center_coordinates[0]), str(center_coordinates[1]))) | ||||
|         # list for sightseeing | ||||
|         if preferences.sightseeing.score != 0: | ||||
| @@ -219,7 +216,7 @@ class LandmarkManager: | ||||
|                 selector = sel, | ||||
|                 conditions = query_conditions,        # except for nature.... | ||||
|                 includeCenter = True, | ||||
|                 out = 'body' | ||||
|                 out = 'center' | ||||
|                 ) | ||||
|             self.logger.debug(f"Query: {query}") | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user