amazing cache #55
File diff suppressed because one or more lines are too long
@ -129,8 +129,6 @@ def new_trip(preferences: Preferences,
|
|||||||
trip = Trip.from_linked_landmarks(linked_tour, cache_client)
|
trip = Trip.from_linked_landmarks(linked_tour, cache_client)
|
||||||
logger.info(f'Generated a trip of {trip.total_time} minutes with {len(refined_tour)} landmarks in {round(t_generate_landmarks + t_first_stage + t_second_stage,3)} seconds.')
|
logger.info(f'Generated a trip of {trip.total_time} minutes with {len(refined_tour)} landmarks in {round(t_generate_landmarks + t_first_stage + t_second_stage,3)} seconds.')
|
||||||
|
|
||||||
background_tasks = BackgroundTasks(fill_cache())
|
|
||||||
|
|
||||||
return trip
|
return trip
|
||||||
|
|
||||||
|
|
||||||
@ -148,6 +146,7 @@ def get_trip(trip_uuid: str) -> Trip:
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
trip = cache_client.get(f"trip_{trip_uuid}")
|
trip = cache_client.get(f"trip_{trip_uuid}")
|
||||||
|
background_tasks = BackgroundTasks(fill_cache())
|
||||||
return trip
|
return trip
|
||||||
except KeyError as exc:
|
except KeyError as exc:
|
||||||
raise HTTPException(status_code=404, detail="Trip not found") from exc
|
raise HTTPException(status_code=404, detail="Trip not found") from exc
|
||||||
|
@ -573,7 +573,7 @@ class Optimizer:
|
|||||||
prob, x = self.pre_processing(L, landmarks, max_time, max_landmarks)
|
prob, x = self.pre_processing(L, landmarks, max_time, max_landmarks)
|
||||||
|
|
||||||
# Solve the problem and extract results.
|
# Solve the problem and extract results.
|
||||||
prob.solve(pl.PULP_CBC_CMD(msg=False, gapRel=0.1, timeLimit=10, warmStart=False))
|
prob.solve(pl.PULP_CBC_CMD(msg=False, gapRel=0.1, timeLimit=3, warmStart=False))
|
||||||
status = pl.LpStatus[prob.status]
|
status = pl.LpStatus[prob.status]
|
||||||
solution = [pl.value(var) for var in x] # The values of the decision variables (will be 0 or 1)
|
solution = [pl.value(var) for var in x] # The values of the decision variables (will be 0 or 1)
|
||||||
|
|
||||||
@ -614,5 +614,5 @@ class Optimizer:
|
|||||||
order = self.get_order(solution)
|
order = self.get_order(solution)
|
||||||
tour = [landmarks[i] for i in order]
|
tour = [landmarks[i] for i in order]
|
||||||
|
|
||||||
self.logger.debug(f"Re-optimized {i} times, objective value : {int(pl.value(prob.objective))}")
|
self.logger.info(f"Re-optimized {i} times, objective value : {int(pl.value(prob.objective))}")
|
||||||
return tour
|
return tour
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
import os
|
import os
|
||||||
import xml.etree.ElementTree as ET
|
import xml.etree.ElementTree as ET
|
||||||
import hashlib
|
import hashlib
|
||||||
import time
|
|
||||||
|
|
||||||
from ..constants import OSM_CACHE_DIR, OSM_TYPES
|
from ..constants import OSM_CACHE_DIR, OSM_TYPES
|
||||||
|
|
||||||
|
@ -32,92 +32,74 @@ class Overpass :
|
|||||||
def send_query(self, bbox: tuple, osm_types: OSM_TYPES,
|
def send_query(self, bbox: tuple, osm_types: OSM_TYPES,
|
||||||
selector: str, conditions=[], out='center') -> ET:
|
selector: str, conditions=[], out='center') -> ET:
|
||||||
"""
|
"""
|
||||||
Sends the Overpass QL query to the Overpass API and returns the parsed JSON response.
|
Sends the Overpass QL query to the Overpass API and returns the parsed XML response.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
query (str): The Overpass QL query to be sent to the Overpass API.
|
bbox (tuple): Bounding box for the query.
|
||||||
|
osm_types (list[str]): List of OSM element types (e.g., 'node', 'way').
|
||||||
|
selector (str): Key or tag to filter OSM elements (e.g., 'highway').
|
||||||
|
conditions (list): Optional list of additional filter conditions in Overpass QL format.
|
||||||
|
out (str): Output format ('center', 'body', etc.). Defaults to 'center'.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict: The parsed JSON response from the Overpass API, or None if the request fails.
|
ET.Element: Parsed XML response from the Overpass API, or cached data if available.
|
||||||
"""
|
"""
|
||||||
# Determine which grid cells overlap with this bounding box.
|
# Determine which grid cells overlap with this bounding box.
|
||||||
overlapping_cells = Overpass.get_overlapping_cells(bbox)
|
overlapping_cells = Overpass._get_overlapping_cells(bbox)
|
||||||
|
|
||||||
# Check the cache for any data that overlaps with these cells
|
# Retrieve cached data and identify missing cache entries
|
||||||
cell_key_dict = {}
|
cached_responses, hollow_cache_keys = self._retrieve_cached_data(overlapping_cells, osm_types, selector, conditions, out)
|
||||||
for cell in overlapping_cells :
|
|
||||||
for elem in osm_types :
|
|
||||||
key_str = f"{elem}[{selector}]{conditions}({','.join(map(str, cell))})"
|
|
||||||
|
|
||||||
cell_key_dict[cell] = get_cache_key(key_str)
|
|
||||||
|
|
||||||
cached_responses = []
|
|
||||||
hollow_cache_keys = []
|
|
||||||
|
|
||||||
# Retrieve the cached data and mark the missing entries as hollow
|
|
||||||
for cell, key in cell_key_dict.items():
|
|
||||||
cached_data = self.caching_strategy.get(key)
|
|
||||||
if cached_data is not None :
|
|
||||||
cached_responses.append(cached_data)
|
|
||||||
else:
|
|
||||||
# Cache miss: Mark the cache key as hollow
|
|
||||||
self.caching_strategy.set_hollow(key, cell, osm_types, selector, conditions, out)
|
|
||||||
hollow_cache_keys.append(key)
|
|
||||||
|
|
||||||
# If there is no missing data, return the cached responses
|
# If there is no missing data, return the cached responses
|
||||||
if not hollow_cache_keys :
|
if not hollow_cache_keys :
|
||||||
self.logger.debug(f'Cache hit.')
|
self.logger.debug(f'Cache hit.')
|
||||||
return self.combine_cached_data(cached_responses)
|
return self._combine_cached_data(cached_responses)
|
||||||
|
|
||||||
# TODO If there is SOME missing data : hybrid stuff with partial cache
|
# TODO If there is SOME missing data : hybrid stuff with partial cache
|
||||||
|
|
||||||
# Build the query string in case of needed overpass query
|
# Missing data: Make a query to Overpass API
|
||||||
query_str = Overpass.build_query(bbox, osm_types, selector, conditions, out)
|
query_str = Overpass.build_query(bbox, osm_types, selector, conditions, out)
|
||||||
|
self.fetch_data_from_api(query_str)
|
||||||
|
|
||||||
# Prepare the data to be sent as POST request, encoded as bytes
|
|
||||||
data = urllib.parse.urlencode({'data': query_str}).encode('utf-8')
|
|
||||||
|
|
||||||
|
def fetch_data_from_api(self, query_str: str, cache_key: str = None) -> ET.Element:
|
||||||
|
"""
|
||||||
|
Fetch data from the Overpass API and update the cache.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
query_str (str): The Overpass query string.
|
||||||
|
cached_responses (list): Cached responses to combine with fetched data.
|
||||||
|
hollow_cache_keys (list): Cache keys for missing data to be updated.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ET.Element: Combined cached and fetched data.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
# Create a Request object with the specified URL, data, and headers
|
data = urllib.parse.urlencode({'data': query_str}).encode('utf-8')
|
||||||
request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers)
|
request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers)
|
||||||
|
|
||||||
# Send the request and read the response
|
|
||||||
with urllib.request.urlopen(request) as response:
|
with urllib.request.urlopen(request) as response:
|
||||||
# Read and decode the response
|
|
||||||
response_data = response.read().decode('utf-8')
|
response_data = response.read().decode('utf-8')
|
||||||
root = ET.fromstring(response_data)
|
root = ET.fromstring(response_data)
|
||||||
|
|
||||||
|
if cache_key is not None :
|
||||||
|
self.caching_strategy.set(cache_key, root)
|
||||||
|
self.logger.debug(f'Cache set.')
|
||||||
|
else :
|
||||||
self.logger.debug(f'Cache miss. Fetching data through Overpass\nQuery = {query_str}')
|
self.logger.debug(f'Cache miss. Fetching data through Overpass\nQuery = {query_str}')
|
||||||
|
|
||||||
return root
|
return root
|
||||||
|
|
||||||
except urllib.error.URLError as e:
|
except urllib.error.URLError as e:
|
||||||
raise ConnectionError(f"Error connecting to Overpass API: {e}") from e
|
self.logger.error(f"Error connecting to Overpass API: {e}")
|
||||||
|
raise ConnectionError(f"Error connecting to Overpass API: {e}")
|
||||||
|
|
||||||
|
|
||||||
def fill_cache(self, xml_string: str) :
|
def fill_cache(self, xml_string: str) :
|
||||||
|
"""
|
||||||
# Build the query using info from hollow cache entry
|
Fill cache with data by using a hollow cache entry's information.
|
||||||
query_str, cache_key = Overpass.build_query_from_hollow(xml_string)
|
"""
|
||||||
|
query_str, cache_key = Overpass._build_query_from_hollow(xml_string)
|
||||||
# Prepare the data to be sent as POST request, encoded as bytes
|
self.fetch_data_from_api(query_str, cache_key)
|
||||||
data = urllib.parse.urlencode({'data': query_str}).encode('utf-8')
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Create a Request object with the specified URL, data, and headers
|
|
||||||
request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers)
|
|
||||||
|
|
||||||
# Send the request and read the response
|
|
||||||
with urllib.request.urlopen(request) as response:
|
|
||||||
# Read and decode the response
|
|
||||||
response_data = response.read().decode('utf-8')
|
|
||||||
root = ET.fromstring(response_data)
|
|
||||||
|
|
||||||
self.caching_strategy.set(cache_key, root)
|
|
||||||
self.logger.debug(f'Cache set')
|
|
||||||
|
|
||||||
except urllib.error.URLError as e:
|
|
||||||
raise ConnectionError(f"Error connecting to Overpass API: {e}") from e
|
|
||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -169,17 +151,56 @@ class Overpass :
|
|||||||
return query
|
return query
|
||||||
|
|
||||||
|
|
||||||
@staticmethod
|
def _retrieve_cached_data(self, overlapping_cells: list, osm_types: OSM_TYPES, selector: str, conditions: list, out: str):
|
||||||
def build_query_from_hollow(xml_string):
|
"""
|
||||||
"""Extract variables from an XML string."""
|
Retrieve cached data and identify missing cache entries.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
overlapping_cells (list): Cells to check for cached data.
|
||||||
|
osm_types (list): OSM types (e.g., 'node', 'way').
|
||||||
|
selector (str): Key or tag to filter OSM elements.
|
||||||
|
conditions (list): Additional conditions to apply.
|
||||||
|
out (str): Output format.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple: A tuple containing:
|
||||||
|
- cached_responses (list): List of cached data found.
|
||||||
|
- hollow_cache_keys (list): List of keys with missing data.
|
||||||
|
"""
|
||||||
|
cell_key_dict = {}
|
||||||
|
for cell in overlapping_cells :
|
||||||
|
for elem in osm_types :
|
||||||
|
key_str = f"{elem}[{selector}]{conditions}({','.join(map(str, cell))})"
|
||||||
|
|
||||||
|
cell_key_dict[cell] = get_cache_key(key_str)
|
||||||
|
|
||||||
|
cached_responses = []
|
||||||
|
hollow_cache_keys = []
|
||||||
|
|
||||||
|
# Retrieve the cached data and mark the missing entries as hollow
|
||||||
|
for cell, key in cell_key_dict.items():
|
||||||
|
cached_data = self.caching_strategy.get(key)
|
||||||
|
if cached_data is not None :
|
||||||
|
cached_responses.append(cached_data)
|
||||||
|
else:
|
||||||
|
self.caching_strategy.set_hollow(key, cell, osm_types, selector, conditions, out)
|
||||||
|
hollow_cache_keys.append(key)
|
||||||
|
|
||||||
|
return cached_responses, hollow_cache_keys
|
||||||
|
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _build_query_from_hollow(xml_string):
|
||||||
|
"""
|
||||||
|
Build query string using information from a hollow cache entry.
|
||||||
|
"""
|
||||||
# Parse the XML string into an ElementTree object
|
# Parse the XML string into an ElementTree object
|
||||||
root = ET.fromstring(xml_string)
|
root = ET.fromstring(xml_string)
|
||||||
|
|
||||||
# Extract values from the XML tree
|
# Extract values from the XML tree
|
||||||
key = root.find('key').text
|
key = root.find('key').text
|
||||||
cell = tuple(map(float, root.find('cell').text.strip('()').split(',')))
|
cell = tuple(map(float, root.find('cell').text.strip('()').split(',')))
|
||||||
bbox = Overpass.get_bbox_from_grid_cell(cell[0], cell[1])
|
bbox = Overpass._get_bbox_from_grid_cell(cell[0], cell[1])
|
||||||
osm_types = root.find('osm_types').text.split(',')
|
osm_types = root.find('osm_types').text.split(',')
|
||||||
selector = root.find('selector').text
|
selector = root.find('selector').text
|
||||||
conditions = root.find('conditions').text.split(',') if root.find('conditions').text != "none" else []
|
conditions = root.find('conditions').text.split(',') if root.find('conditions').text != "none" else []
|
||||||
@ -191,7 +212,26 @@ class Overpass :
|
|||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_grid_cell(lat: float, lon: float):
|
def _get_overlapping_cells(query_bbox: tuple):
|
||||||
|
"""
|
||||||
|
Returns a set of all grid cells that overlap with the given bounding box.
|
||||||
|
"""
|
||||||
|
# Extract location from the query bbox
|
||||||
|
lat_min, lon_min, lat_max, lon_max = query_bbox
|
||||||
|
|
||||||
|
min_lat_cell, min_lon_cell = Overpass._get_grid_cell(lat_min, lon_min)
|
||||||
|
max_lat_cell, max_lon_cell = Overpass._get_grid_cell(lat_max, lon_max)
|
||||||
|
|
||||||
|
overlapping_cells = set()
|
||||||
|
for lat_idx in range(min_lat_cell, max_lat_cell + 1):
|
||||||
|
for lon_idx in range(min_lon_cell, max_lon_cell + 1):
|
||||||
|
overlapping_cells.add((lat_idx, lon_idx))
|
||||||
|
|
||||||
|
return overlapping_cells
|
||||||
|
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_grid_cell(lat: float, lon: float):
|
||||||
"""
|
"""
|
||||||
Returns the grid cell coordinates for a given latitude and longitude.
|
Returns the grid cell coordinates for a given latitude and longitude.
|
||||||
Each grid cell is 0.05°lat x 0.05°lon resolution in size.
|
Each grid cell is 0.05°lat x 0.05°lon resolution in size.
|
||||||
@ -202,7 +242,7 @@ class Overpass :
|
|||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_bbox_from_grid_cell(lat_index: int, lon_index: int):
|
def _get_bbox_from_grid_cell(lat_index: int, lon_index: int):
|
||||||
"""
|
"""
|
||||||
Returns the bounding box for a given grid cell index.
|
Returns the bounding box for a given grid cell index.
|
||||||
Each grid cell is resolution x resolution in size.
|
Each grid cell is resolution x resolution in size.
|
||||||
@ -221,26 +261,7 @@ class Overpass :
|
|||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_overlapping_cells(query_bbox: tuple):
|
def _combine_cached_data(cached_data_list):
|
||||||
"""
|
|
||||||
Returns a set of all grid cells that overlap with the given bounding box.
|
|
||||||
"""
|
|
||||||
# Extract location from the query bbox
|
|
||||||
lat_min, lon_min, lat_max, lon_max = query_bbox
|
|
||||||
|
|
||||||
min_lat_cell, min_lon_cell = Overpass.get_grid_cell(lat_min, lon_min)
|
|
||||||
max_lat_cell, max_lon_cell = Overpass.get_grid_cell(lat_max, lon_max)
|
|
||||||
|
|
||||||
overlapping_cells = set()
|
|
||||||
for lat_idx in range(min_lat_cell, max_lat_cell + 1):
|
|
||||||
for lon_idx in range(min_lon_cell, max_lon_cell + 1):
|
|
||||||
overlapping_cells.add((lat_idx, lon_idx))
|
|
||||||
|
|
||||||
return overlapping_cells
|
|
||||||
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def combine_cached_data(cached_data_list):
|
|
||||||
"""
|
"""
|
||||||
Combines data from multiple cached responses into a single result.
|
Combines data from multiple cached responses into a single result.
|
||||||
"""
|
"""
|
||||||
|
@ -27,8 +27,8 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
|
|||||||
"/trip/new",
|
"/trip/new",
|
||||||
json={
|
json={
|
||||||
"preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
|
"preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
|
||||||
"nature": {"type": "nature", "score": 0},
|
"nature": {"type": "nature", "score": 5},
|
||||||
"shopping": {"type": "shopping", "score": 0},
|
"shopping": {"type": "shopping", "score": 5},
|
||||||
"max_time_minute": duration_minutes,
|
"max_time_minute": duration_minutes,
|
||||||
"detour_tolerance_minute": 0},
|
"detour_tolerance_minute": 0},
|
||||||
"start": [48.084588, 7.280405]
|
"start": [48.084588, 7.280405]
|
||||||
@ -100,7 +100,7 @@ def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
|
|||||||
|
|
||||||
def test_cologne(client, request) : # pylint: disable=redefined-outer-name
|
def test_cologne(client, request) : # pylint: disable=redefined-outer-name
|
||||||
"""
|
"""
|
||||||
Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
|
Test n°3 : Custom test in Cologne to ensure proper decision making in crowded area.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
client:
|
client:
|
||||||
@ -141,7 +141,7 @@ def test_cologne(client, request) : # pylint: disable=redefined-outer-name
|
|||||||
|
|
||||||
def test_strasbourg(client, request) : # pylint: disable=redefined-outer-name
|
def test_strasbourg(client, request) : # pylint: disable=redefined-outer-name
|
||||||
"""
|
"""
|
||||||
Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
|
Test n°4 : Custom test in Strasbourg to ensure proper decision making in crowded area.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
client:
|
client:
|
||||||
@ -182,7 +182,7 @@ def test_strasbourg(client, request) : # pylint: disable=redefined-outer-name
|
|||||||
|
|
||||||
def test_zurich(client, request) : # pylint: disable=redefined-outer-name
|
def test_zurich(client, request) : # pylint: disable=redefined-outer-name
|
||||||
"""
|
"""
|
||||||
Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
|
Test n°5 : Custom test in Zurich to ensure proper decision making in crowded area.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
client:
|
client:
|
||||||
@ -223,24 +223,24 @@ def test_zurich(client, request) : # pylint: disable=redefined-outer-name
|
|||||||
|
|
||||||
def test_paris(client, request) : # pylint: disable=redefined-outer-name
|
def test_paris(client, request) : # pylint: disable=redefined-outer-name
|
||||||
"""
|
"""
|
||||||
Test n°2 : Custom test in Paris (les Halles) centre to ensure proper decision making in crowded area.
|
Test n°6 : Custom test in Paris (les Halles) centre to ensure proper decision making in crowded area.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
client:
|
client:
|
||||||
request:
|
request:
|
||||||
"""
|
"""
|
||||||
start_time = time.time() # Start timer
|
start_time = time.time() # Start timer
|
||||||
duration_minutes = 300
|
duration_minutes = 200
|
||||||
|
|
||||||
response = client.post(
|
response = client.post(
|
||||||
"/trip/new",
|
"/trip/new",
|
||||||
json={
|
json={
|
||||||
"preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
|
"preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
|
||||||
"nature": {"type": "nature", "score": 5},
|
"nature": {"type": "nature", "score": 0},
|
||||||
"shopping": {"type": "shopping", "score": 5},
|
"shopping": {"type": "shopping", "score": 5},
|
||||||
"max_time_minute": duration_minutes,
|
"max_time_minute": duration_minutes,
|
||||||
"detour_tolerance_minute": 0},
|
"detour_tolerance_minute": 0},
|
||||||
"start": [48.86248803298562, 2.346451131285925]
|
"start": [48.85468881798671, 2.3423925755998374]
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
result = response.json()
|
result = response.json()
|
||||||
@ -264,7 +264,7 @@ def test_paris(client, request) : # pylint: disable=redefined-outer-name
|
|||||||
|
|
||||||
def test_new_york(client, request) : # pylint: disable=redefined-outer-name
|
def test_new_york(client, request) : # pylint: disable=redefined-outer-name
|
||||||
"""
|
"""
|
||||||
Test n°2 : Custom test in New York (les Halles) centre to ensure proper decision making in crowded area.
|
Test n°7 : Custom test in New York to ensure proper decision making in crowded area.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
client:
|
client:
|
||||||
@ -305,7 +305,7 @@ def test_new_york(client, request) : # pylint: disable=redefined-outer-name
|
|||||||
|
|
||||||
def test_shopping(client, request) : # pylint: disable=redefined-outer-name
|
def test_shopping(client, request) : # pylint: disable=redefined-outer-name
|
||||||
"""
|
"""
|
||||||
Test n°3 : Custom test in Lyon centre to ensure shopping clusters are found.
|
Test n°8 : Custom test in Lyon centre to ensure shopping clusters are found.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
client:
|
client:
|
||||||
|
@ -5,6 +5,7 @@ import xml.etree.ElementTree as ET
|
|||||||
from ..overpass.overpass import Overpass, get_base_info
|
from ..overpass.overpass import Overpass, get_base_info
|
||||||
from ..structs.landmark import Toilets
|
from ..structs.landmark import Toilets
|
||||||
from ..constants import OSM_CACHE_DIR
|
from ..constants import OSM_CACHE_DIR
|
||||||
|
from .utils import create_bbox
|
||||||
|
|
||||||
|
|
||||||
# silence the overpass logger
|
# silence the overpass logger
|
||||||
@ -53,20 +54,18 @@ class ToiletsManager:
|
|||||||
list[Toilets]: A list of `Toilets` objects containing detailed information
|
list[Toilets]: A list of `Toilets` objects containing detailed information
|
||||||
about the toilets found around the given coordinates.
|
about the toilets found around the given coordinates.
|
||||||
"""
|
"""
|
||||||
bbox = tuple((self.radius, self.location[0], self.location[1]))
|
bbox = create_bbox(self.location, self.radius)
|
||||||
osm_types = ['node', 'way', 'relation']
|
osm_types = ['node', 'way', 'relation']
|
||||||
toilets_list = []
|
toilets_list = []
|
||||||
|
|
||||||
query = self.overpass.build_query(
|
query = self.overpass.build_query(
|
||||||
area = bbox,
|
bbox = bbox,
|
||||||
osm_types = osm_types,
|
osm_types = osm_types,
|
||||||
selector = '"amenity"="toilets"',
|
selector = '"amenity"="toilets"',
|
||||||
out = 'ids center tags'
|
out = 'ids center tags'
|
||||||
)
|
)
|
||||||
self.logger.debug(f"Query: {query}")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = self.overpass.send_query(query)
|
result = self.overpass.fetch_data_from_api(query_str=query)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f"Error fetching landmarks: {e}")
|
self.logger.error(f"Error fetching landmarks: {e}")
|
||||||
return None
|
return None
|
||||||
|
Loading…
x
Reference in New Issue
Block a user