fixed up clusters

This commit is contained in:
Helldragon67 2025-01-23 10:33:32 +01:00
parent ca40de82dd
commit 78f1dcaab4
6 changed files with 105 additions and 84 deletions

File diff suppressed because one or more lines are too long

@ -39,6 +39,8 @@ def build_query(area: tuple, element_types: ElementTypes, selector: str,
"""
if not isinstance(conditions, list) :
conditions = [conditions]
if not isinstance(element_types, list) :
element_types = [element_types]
query = '('
@ -60,7 +62,7 @@ def build_query(area: tuple, element_types: ElementTypes, selector: str,
return query
def send_overpass_query(query: str, use_cache: bool = True) -> dict:
def send_overpass_query(query: str) -> dict:
"""
Sends the Overpass QL query to the Overpass API and returns the parsed JSON response.
@ -76,7 +78,7 @@ def send_overpass_query(query: str, use_cache: bool = True) -> dict:
# Try to fetch the result from the cache
cached_response = CachingStrategy.get(cache_key)
if cached_response:
if cached_response is not None :
print("Cache hit!")
return cached_response

@ -51,16 +51,14 @@ sightseeing:
- place_of_worship
- fountain
- townhall
water:
- reflecting_pool
water: reflecting_pool
bridge:
- aqueduct
- viaduct
- boardwalk
- cantilever
- abandoned
building:
- cathedral
building: cathedral
# unused sightseeing/buildings:
# - church

@ -11,7 +11,7 @@ def client():
"""Client used to call the app."""
return TestClient(app)
'''
def test_turckheim(client, request): # pylint: disable=redefined-outer-name
"""
Test n°1 : Custom test in Turckheim to ensure small villages are also supported.
@ -54,7 +54,7 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
assert len(landmarks) > 2 # check that there is something to visit
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
# assert 2==3
'''
def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
"""
@ -97,7 +97,7 @@ def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
# assert 2 == 3
'''
def test_cologne(client, request) : # pylint: disable=redefined-outer-name
"""
Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
@ -216,7 +216,7 @@ def test_zurich(client, request) : # pylint: disable=redefined-outer-name
assert response.status_code == 200 # check for successful planning
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
'''
def test_paris(client, request) : # pylint: disable=redefined-outer-name
"""
@ -257,7 +257,7 @@ def test_paris(client, request) : # pylint: disable=redefined-outer-name
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
'''
def test_new_york(client, request) : # pylint: disable=redefined-outer-name
"""
Test n°2 : Custom test in New York (les Halles) centre to ensure proper decision making in crowded area.
@ -337,7 +337,7 @@ def test_shopping(client, request) : # pylint: disable=redefined-outer-name
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
'''
# def test_new_trip_single_prefs(client):
# response = client.post(
# "/trip/new",

@ -5,8 +5,10 @@ from typing import Literal
import numpy as np
from sklearn.cluster import DBSCAN
from pydantic import BaseModel
from OSMPythonTools.overpass import Overpass, overpassQueryBuilder
from OSMPythonTools.cachingStrategy import CachingStrategy, JSON
# from OSMPythonTools.overpass import Overpass, overpassQueryBuilder
# from OSMPythonTools.cachingStrategy import CachingStrategy, JSON
from ..overpass.overpass import build_query, send_overpass_query
from ..overpass.caching_strategy import CachingStrategy
from ..structs.landmark import Landmark
from .get_time_distance import get_distance
@ -79,43 +81,54 @@ class ClusterManager:
Args:
bbox: The bounding box coordinates (around:radius, center_lat, center_lon).
"""
self.overpass = Overpass()
CachingStrategy.use(JSON, cacheDir=OSM_CACHE_DIR)
# self.overpass = Overpass()
# CachingStrategy.use(JSON, cacheDir=OSM_CACHE_DIR)
CachingStrategy.use('XML', cache_dir=OSM_CACHE_DIR)
self.cluster_type = cluster_type
if cluster_type == 'shopping' :
elem_type = ['node']
sel = ['"shop"~"^(bag|boutique|clothes)$"']
out = 'skel'
else :
elem_type = ['way']
sel = ['"historic"="building"']
out = 'center'
osm_types = ['node']
sel = '"shop"~"^(bag|boutique|clothes)$"'
out = 'ids center'
elif cluster_type == 'sightseeing' :
osm_types = ['way']
sel = '"historic"~"^(monument|building|yes)$"'
out = 'ids center'
# Initialize the points for cluster detection
query = overpassQueryBuilder(
bbox = bbox,
elementType = elem_type,
query = build_query(
area = bbox,
element_types = osm_types,
selector = sel,
includeCenter = True,
out = out
)
self.logger.debug(f"Cluster query: {query}")
try:
result = self.overpass.query(query)
result = send_overpass_query(query)
except Exception as e:
self.logger.error(f"Error fetching landmarks: {e}")
if len(result.elements()) == 0 :
if result is None :
self.logger.error(f"Error fetching {cluster_type} clusters, overpass query returned None.")
self.valid = False
else :
points = []
for elem in result.elements() :
coords = tuple((elem.lat(), elem.lon()))
if coords[0] is None :
coords = tuple((elem.centerLat(), elem.centerLon()))
points.append(coords)
for osm_type in osm_types :
for elem in result.findall(osm_type):
center = elem.find('center')
if osm_type != 'node' :
lat = float(center.get('lat'))
lon = float(center.get('lon'))
points.append(tuple((lat, lon)))
else :
lat = float(elem.get('lat'))
lon = float(elem.get('lon'))
points.append(tuple((lat, lon)))
self.all_points = np.array(points)
@ -123,14 +136,14 @@ class ClusterManager:
if self.cluster_type == 'shopping' and len(self.all_points) > 200 :
dbscan = DBSCAN(eps=0.00118, min_samples=15, algorithm='kd_tree') # for large cities
elif self.cluster_type == 'sightseeing' :
dbscan = DBSCAN(eps=0.0025, min_samples=15, algorithm='kd_tree') # for historic neighborhoods
dbscan = DBSCAN(eps=0.003, min_samples=10, algorithm='kd_tree') # for historic neighborhoods
else :
dbscan = DBSCAN(eps=0.00075, min_samples=10, algorithm='kd_tree') # for small cities
labels = dbscan.fit_predict(self.all_points)
# Check that there are at least 2 different clusters
if len(set(labels)) > 2 :
# Check that there are is least 1 cluster
if len(set(labels)) > 1 :
self.logger.debug(f"Found {len(set(labels))} different clusters.")
# Separate clustered points and noise points
self.cluster_points = self.all_points[labels != -1]
@ -139,6 +152,7 @@ class ClusterManager:
self.valid = True
else :
self.logger.error(f"Detected 0 {cluster_type} clusters.")
self.valid = False
@ -208,7 +222,7 @@ class ClusterManager:
# Define the bounding box for a given radius around the coordinates
lat, lon = cluster.centroid
bbox = ("around:1000", str(lat), str(lon))
bbox = (1000, lat, lon)
# Query neighborhoods and shopping malls
selectors = ['"place"~"^(suburb|neighborhood|neighbourhood|quarter|city_block)$"']
@ -222,48 +236,56 @@ class ClusterManager:
t = 15
min_dist = float('inf')
new_name_en = None
osm_id = 0
osm_type = 'node'
osm_types = ['node', 'way', 'relation']
for sel in selectors :
query = overpassQueryBuilder(
bbox = bbox,
elementType = ['node', 'way', 'relation'],
query = build_query(
area = bbox,
element_types = osm_types,
selector = sel,
includeCenter = True,
out = 'center'
out = 'ids center'
)
try:
result = self.overpass.query(query)
result = send_overpass_query(query)
except Exception as e:
self.logger.error(f"Error fetching landmarks: {e}")
continue
for elem in result.elements():
location = (elem.centerLat(), elem.centerLon())
if result is None :
self.logger.error(f"Error fetching landmarks: {e}")
continue
# Skip if element has neither name or location
if elem.tag('name') is None :
continue
if location[0] is None :
location = (elem.lat(), elem.lon())
if location[0] is None :
for osm_type in osm_types :
for elem in result.findall(osm_type):
name = elem.find("tag[@k='name']").get('v') if elem.find("tag[@k='name']") is not None else None
center = elem.find('center')
# Extract the center latitude and longitude if available.
if name is None :
continue
d = get_distance(cluster.centroid, location)
if d < min_dist :
min_dist = d
new_name = elem.tag('name')
osm_type = elem.type() # Add type: 'way' or 'relation'
osm_id = elem.id() # Add OSM id
if osm_type != 'node' :
lat = float(center.get('lat'))
lon = float(center.get('lon'))
else :
lat = float(elem.get('lat'))
lon = float(elem.get('lon'))
# Add english name if it exists
try :
new_name_en = elem.tag('name:en')
except Exception:
pass
coords = tuple((lat, lon))
if coords is None :
continue
d = get_distance(cluster.centroid, coords)
if d < min_dist :
min_dist = d
new_name = name
osm_type = osm_type # Add type: 'way' or 'relation'
osm_id = elem.get('id') # Add OSM id
return Landmark(
name=new_name,
@ -273,7 +295,6 @@ class ClusterManager:
n_tags=0,
osm_id=osm_id,
osm_type=osm_type,
name_en=new_name_en,
duration=t
)

@ -96,10 +96,10 @@ class LandmarkManager:
self.logger.debug('Fetching sightseeing clusters...')
# special pipeline for historic neighborhoods
# neighborhood_manager = ClusterManager(bbox, 'sightseeing')
# historic_clusters = neighborhood_manager.generate_clusters()
# all_landmarks.update(historic_clusters)
# self.logger.debug('Sightseeing clusters done')
neighborhood_manager = ClusterManager(bbox, 'sightseeing')
historic_clusters = neighborhood_manager.generate_clusters()
all_landmarks.update(historic_clusters)
self.logger.debug('Sightseeing clusters done')
# list for nature
if preferences.nature.score != 0:
@ -120,10 +120,10 @@ class LandmarkManager:
all_landmarks.update(current_landmarks)
# special pipeline for shopping malls
# shopping_manager = ClusterManager(bbox, 'shopping')
# shopping_clusters = shopping_manager.generate_clusters()
# all_landmarks.update(shopping_clusters)
# self.logger.debug('Shopping clusters done')
shopping_manager = ClusterManager(bbox, 'shopping')
shopping_clusters = shopping_manager.generate_clusters()
all_landmarks.update(shopping_clusters)
self.logger.debug('Shopping clusters done')
@ -210,14 +210,14 @@ class LandmarkManager:
self.logger.error(f"Error fetching landmarks: {e}")
continue
return_list += self.parse_overpass_result(result, landmarktype, preference_level)
return_list += self.xml_to_landmarks(result, landmarktype, preference_level)
self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}")
return return_list
def parse_overpass_result(self, root: ET.Element, landmarktype, preference_level) -> list[Landmark]:
def xml_to_landmarks(self, root: ET.Element, landmarktype, preference_level) -> list[Landmark]:
"""
Parse the Overpass API result and extract landmarks.
@ -239,9 +239,6 @@ class LandmarkManager:
landmarks = []
for osm_type in ['node', 'way', 'relation'] :
for elem in root.findall(osm_type):
# self.logger.debug('new landmark')
# Extract basic info from the landmark.
name = elem.find("tag[@k='name']").get('v') if elem.find("tag[@k='name']") is not None else None
center = elem.find('center')
tags = elem.findall('tag')
@ -253,6 +250,7 @@ class LandmarkManager:
coords = tuple((lat, lon))
else :
continue
# Convert this to Landmark object
landmark = Landmark(name=name,
@ -307,6 +305,8 @@ class LandmarkManager:
landmark.duration = 5
else:
# add them to cache here before setting the score
# name should be : 'osm_type + str(osm_id) + 'json'
self.set_landmark_score(landmark, landmarktype, preference_level)
landmarks.append(landmark)
# self.logger.debug('new landmark added')