Some checks failed
Build and deploy the backend to staging / Build and push image (pull_request) Successful in 1m40s
Run linting on the backend code / Build (pull_request) Successful in 55s
Run testing on the backend code / Build (pull_request) Has been cancelled
Build and deploy the backend to staging / Deploy to staging (pull_request) Successful in 25s
424 lines
17 KiB
Python
424 lines
17 KiB
Python
"""Module allowing connexion to overpass api and fectch data from OSM."""
|
|
import os
|
|
import time
|
|
import urllib
|
|
import math
|
|
import logging
|
|
import json
|
|
from typing import List, Tuple
|
|
|
|
from .caching_strategy import get_cache_key, CachingStrategy
|
|
from ..constants import OSM_CACHE_DIR, OSM_TYPES, BBOX
|
|
|
|
|
|
RESOLUTION = 0.05
|
|
CELL = Tuple[int, int]
|
|
|
|
|
|
class Overpass :
|
|
"""
|
|
Overpass class to manage the query building and sending to overpass api.
|
|
The caching strategy is a part of this class and initialized upon creation of the Overpass object.
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def __init__(self, caching_strategy: str = 'JSON', cache_dir: str = OSM_CACHE_DIR) :
|
|
"""
|
|
Initialize the Overpass instance with the url, headers and caching strategy.
|
|
"""
|
|
self.overpass_url = "https://overpass-api.de/api/interpreter"
|
|
self.headers = {'User-Agent': 'Mozilla/5.0 (compatible; OverpassQuery/1.0; +http://example.com)',}
|
|
self.caching_strategy = CachingStrategy.use(caching_strategy, cache_dir=cache_dir)
|
|
|
|
|
|
def send_query(self, bbox: BBOX, osm_types: OSM_TYPES,
|
|
selector: str, conditions: list=None, out='center') -> List[dict]:
|
|
"""
|
|
Sends the Overpass QL query to the Overpass API and returns the parsed json response.
|
|
|
|
Args:
|
|
bbox (tuple): Bounding box for the query.
|
|
osm_types (list[str]): List of OSM element types (e.g., 'node', 'way').
|
|
selector (str): Key or tag to filter OSM elements (e.g., 'highway').
|
|
conditions (list): Optional list of additional filter conditions in Overpass QL format.
|
|
out (str): Output format ('center', 'body', etc.). Defaults to 'center'.
|
|
|
|
Returns:
|
|
list: Parsed json response from the Overpass API, or cached data if available.
|
|
"""
|
|
# Determine which grid cells overlap with this bounding box.
|
|
overlapping_cells = Overpass._get_overlapping_cells(bbox)
|
|
|
|
# Retrieve cached data and identify missing cache entries
|
|
cached_responses, non_cached_cells = self._retrieve_cached_data(overlapping_cells, osm_types, selector, conditions, out)
|
|
|
|
self.logger.debug(f'Cache hit for {len(overlapping_cells)-len(non_cached_cells)}/{len(overlapping_cells)} quadrants.')
|
|
|
|
# If there is no missing data, return the cached responses after filtering.
|
|
if not non_cached_cells :
|
|
return Overpass._filter_landmarks(cached_responses, bbox)
|
|
|
|
# If there is no cached data, fetch all from Overpass.
|
|
if not cached_responses :
|
|
query_str = Overpass.build_query(bbox, osm_types, selector, conditions, out)
|
|
self.logger.debug(f'Query string: {query_str}')
|
|
return self.fetch_data_from_api(query_str)
|
|
|
|
# Resize the bbox for smaller search area and build new query string.
|
|
non_cached_bbox = Overpass._get_non_cached_bbox(non_cached_cells, bbox)
|
|
query_str = Overpass.build_query(non_cached_bbox, osm_types, selector, conditions, out)
|
|
self.logger.debug(f'Query string: {query_str}')
|
|
non_cached_responses = self.fetch_data_from_api(query_str)
|
|
return Overpass._filter_landmarks(cached_responses, bbox) + non_cached_responses
|
|
|
|
|
|
def fetch_data_from_api(self, query_str: str) -> List[dict]:
|
|
"""
|
|
Fetch data from the Overpass API and return the json data.
|
|
|
|
Args:
|
|
query_str (str): The Overpass query string.
|
|
|
|
Returns:
|
|
dict: Combined cached and fetched data.
|
|
"""
|
|
try:
|
|
data = urllib.parse.urlencode({'data': query_str}).encode('utf-8')
|
|
request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers)
|
|
|
|
with urllib.request.urlopen(request) as response:
|
|
response_data = response.read().decode('utf-8') # Convert the HTTPResponse to a string
|
|
data = json.loads(response_data) # Load the JSON from the string
|
|
elements = data.get('elements', [])
|
|
# self.logger.debug(f'Query = {query_str}')
|
|
return elements
|
|
|
|
except urllib.error.URLError as e:
|
|
self.logger.error(f"Error connecting to Overpass API: {str(e)}")
|
|
raise ConnectionError(f"Error connecting to Overpass API: {str(e)}") from e
|
|
except Exception as exc :
|
|
self.logger.error(f"unexpected error while fetching data from Overpass: {str(exc)}")
|
|
raise Exception(f'An unexpected error occured: {str(exc)}') from exc
|
|
|
|
|
|
def fill_cache(self, json_data: dict) :
|
|
"""
|
|
Fill cache with data by using a hollow cache entry's information.
|
|
"""
|
|
query_str, cache_key = Overpass._build_query_from_hollow(json_data)
|
|
try:
|
|
data = urllib.parse.urlencode({'data': query_str}).encode('utf-8')
|
|
request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers)
|
|
|
|
with urllib.request.urlopen(request) as response:
|
|
|
|
# Convert the HTTPResponse to a string and load data
|
|
response_data = response.read().decode('utf-8')
|
|
data = json.loads(response_data)
|
|
|
|
# Get elements and set cache
|
|
elements = data.get('elements', [])
|
|
self.caching_strategy.set(cache_key, elements)
|
|
self.logger.debug(f'Cache set for {cache_key}')
|
|
except urllib.error.URLError as e:
|
|
raise ConnectionError(f"Error connecting to Overpass API: {str(e)}") from e
|
|
except Exception as exc :
|
|
raise Exception(f'An unexpected error occured: {str(exc)}') from exc
|
|
|
|
|
|
@staticmethod
|
|
def build_query(bbox: BBOX, osm_types: OSM_TYPES,
|
|
selector: str, conditions: list=None, out='center') -> str:
|
|
"""
|
|
Constructs a query string for the Overpass API to retrieve OpenStreetMap (OSM) data.
|
|
|
|
Args:
|
|
bbox (tuple): A tuple representing the geographical search area, typically in the format
|
|
(lat_min, lon_min, lat_max, lon_max).
|
|
osm_types (list[str]): A list of OSM element types to search for. Must be one or more of
|
|
'Way', 'Node', or 'Relation'.
|
|
selector (str): The key or tag to filter the OSM elements (e.g., 'amenity', 'highway', etc.).
|
|
conditions (list, optional): A list of conditions to apply as additional filters for the
|
|
selected OSM elements. The conditions should be written in
|
|
the Overpass QL format, and they are combined with '&&' if
|
|
multiple are provided. Defaults to an empty list.
|
|
out (str, optional): Specifies the output type, such as 'center', 'body', or 'tags'.
|
|
Defaults to 'center'.
|
|
|
|
Returns:
|
|
str: The constructed Overpass QL query string.
|
|
|
|
Notes:
|
|
- If no conditions are provided, the query will just use the `selector` to filter the OSM
|
|
elements without additional constraints.
|
|
"""
|
|
query = '[out:json][timeout:20];('
|
|
|
|
# convert the bbox to string.
|
|
bbox_str = f"({','.join(map(str, bbox))})"
|
|
|
|
if conditions is not None and len(conditions) > 0:
|
|
conditions = '(if: ' + ' && '.join(conditions) + ')'
|
|
else :
|
|
conditions = ''
|
|
|
|
for elem in osm_types :
|
|
query += elem + '[' + selector + ']' + conditions + bbox_str + ';'
|
|
|
|
query += ');' + f'out {out};'
|
|
|
|
return query
|
|
|
|
|
|
def _retrieve_cached_data(self, overlapping_cells: CELL, osm_types: OSM_TYPES,
|
|
selector: str, conditions: list, out: str) -> Tuple[List[dict], list[CELL]]:
|
|
"""
|
|
Retrieve cached data and identify missing cache quadrants.
|
|
|
|
Args:
|
|
overlapping_cells (list): Cells to check for cached data.
|
|
osm_types (list): OSM types (e.g., 'node', 'way').
|
|
selector (str): Key or tag to filter OSM elements.
|
|
conditions (list): Additional conditions to apply.
|
|
out (str): Output format.
|
|
|
|
Returns:
|
|
tuple: A tuple containing:
|
|
- cached_responses (list): List of cached data found.
|
|
- non_cached_cells (list(tuple)): List of cells with missing data.
|
|
"""
|
|
cell_key_dict = {}
|
|
for cell in overlapping_cells :
|
|
for elem in osm_types :
|
|
key_str = f"{elem}[{selector}]{conditions}({','.join(map(str, cell))})"
|
|
|
|
cell_key_dict[cell] = get_cache_key(key_str)
|
|
|
|
cached_responses = []
|
|
non_cached_cells = []
|
|
|
|
# Retrieve the cached data and mark the missing entries as hollow
|
|
for cell, key in cell_key_dict.items():
|
|
cached_data = self.caching_strategy.get(key)
|
|
if cached_data is not None :
|
|
cached_responses += cached_data
|
|
else:
|
|
self.caching_strategy.set_hollow(key, cell, osm_types, selector, conditions, out)
|
|
non_cached_cells.append(cell)
|
|
|
|
return cached_responses, non_cached_cells
|
|
|
|
|
|
@staticmethod
|
|
def _build_query_from_hollow(json_data: dict) -> Tuple[str, str]:
|
|
"""
|
|
Build query string using information from a hollow cache entry.
|
|
"""
|
|
# Extract values from the JSON object
|
|
key = json_data.get('key')
|
|
cell = tuple(json_data.get('cell'))
|
|
bbox = Overpass._get_bbox_from_grid_cell(cell)
|
|
osm_types = json_data.get('osm_types')
|
|
selector = json_data.get('selector')
|
|
conditions = json_data.get('conditions')
|
|
out = json_data.get('out')
|
|
|
|
|
|
query_str = Overpass.build_query(bbox, osm_types, selector, conditions, out)
|
|
return query_str, key
|
|
|
|
|
|
@staticmethod
|
|
def _get_overlapping_cells(query_bbox: tuple) -> List[CELL]:
|
|
"""
|
|
Returns a set of all grid cells that overlap with the given bounding box.
|
|
"""
|
|
# Extract location from the query bbox
|
|
lat_min, lon_min, lat_max, lon_max = query_bbox
|
|
|
|
min_lat_cell, min_lon_cell = Overpass._get_grid_cell(lat_min, lon_min)
|
|
max_lat_cell, max_lon_cell = Overpass._get_grid_cell(lat_max, lon_max)
|
|
|
|
overlapping_cells = set()
|
|
for lat_idx in range(min_lat_cell, max_lat_cell + 1):
|
|
for lon_idx in range(min_lon_cell, max_lon_cell + 1):
|
|
overlapping_cells.add((lat_idx, lon_idx))
|
|
|
|
return overlapping_cells
|
|
|
|
|
|
@staticmethod
|
|
def _get_grid_cell(lat: float, lon: float) -> CELL:
|
|
"""
|
|
Returns the grid cell coordinates for a given latitude and longitude.
|
|
Each grid cell is 0.05°lat x 0.05°lon resolution in size.
|
|
"""
|
|
lat_index = math.floor(lat / RESOLUTION)
|
|
lon_index = math.floor(lon / RESOLUTION)
|
|
return (lat_index, lon_index)
|
|
|
|
|
|
@staticmethod
|
|
def _get_bbox_from_grid_cell(cell: CELL) -> BBOX:
|
|
"""
|
|
Returns the bounding box for a given grid cell index.
|
|
Each grid cell is resolution x resolution in size.
|
|
|
|
The bounding box is returned as (min_lat, min_lon, max_lat, max_lon).
|
|
"""
|
|
# Calculate the southwest (min_lat, min_lon) corner of the bounding box
|
|
min_lat = round(cell[0] * RESOLUTION, 2)
|
|
min_lon = round(cell[1] * RESOLUTION, 2)
|
|
|
|
# Calculate the northeast (max_lat, max_lon) corner of the bounding box
|
|
max_lat = round((cell[0] + 1) * RESOLUTION, 2)
|
|
max_lon = round((cell[1] + 1) * RESOLUTION, 2)
|
|
|
|
return (min_lat, min_lon, max_lat, max_lon)
|
|
|
|
|
|
@staticmethod
|
|
def _get_non_cached_bbox(non_cached_cells: List[CELL], original_bbox: BBOX):
|
|
"""
|
|
Calculate the non-cached bounding box by excluding cached cells.
|
|
|
|
Args:
|
|
non_cached_cells (list): The list of cells that were not found in the cache.
|
|
original_bbox (tuple): The original bounding box (min_lat, min_lon, max_lat, max_lon).
|
|
|
|
Returns:
|
|
tuple: The new bounding box that excludes cached cells, or None if all cells are cached.
|
|
"""
|
|
if not non_cached_cells:
|
|
return None # All cells were cached
|
|
|
|
# Initialize the non-cached bounding box with extreme values
|
|
min_lat, min_lon, max_lat, max_lon = float('inf'), float('inf'), float('-inf'), float('-inf')
|
|
|
|
# Iterate over non-cached cells to find the new bounding box
|
|
for cell in non_cached_cells:
|
|
cell_min_lat, cell_min_lon, cell_max_lat, cell_max_lon = Overpass._get_bbox_from_grid_cell(cell)
|
|
|
|
min_lat = min(min_lat, cell_min_lat)
|
|
min_lon = min(min_lon, cell_min_lon)
|
|
max_lat = max(max_lat, cell_max_lat)
|
|
max_lon = max(max_lon, cell_max_lon)
|
|
|
|
# If no update to bounding box, return the original
|
|
if min_lat == float('inf') or min_lon == float('inf'):
|
|
return None
|
|
|
|
return (max(min_lat, original_bbox[0]),
|
|
max(min_lon, original_bbox[1]),
|
|
min(max_lat, original_bbox[2]),
|
|
min(max_lon, original_bbox[3]))
|
|
|
|
|
|
@staticmethod
|
|
def _filter_landmarks(elements: List[dict], bbox: BBOX) -> List[dict]:
|
|
"""
|
|
Filters elements based on whether their coordinates are inside the given bbox.
|
|
|
|
Args:
|
|
- elements (list of dict): List of elements containing coordinates.
|
|
- bbox (tuple): A bounding box defined as (min_lat, min_lon, max_lat, max_lon).
|
|
|
|
Returns:
|
|
- list: A list of elements whose coordinates are inside the bounding box.
|
|
"""
|
|
|
|
filtered_elements = []
|
|
min_lat, min_lon, max_lat, max_lon = bbox
|
|
|
|
for elem in elements:
|
|
# Extract coordinates based on the 'type' of element
|
|
if elem.get('type') != 'node':
|
|
center = elem.get('center', {})
|
|
lat = float(center.get('lat', 0))
|
|
lon = float(center.get('lon', 0))
|
|
else:
|
|
lat = float(elem.get('lat', 0))
|
|
lon = float(elem.get('lon', 0))
|
|
|
|
# Check if the coordinates fall within the given bounding box
|
|
if min_lat <= lat <= max_lat and min_lon <= lon <= max_lon:
|
|
filtered_elements.append(elem)
|
|
|
|
return filtered_elements
|
|
|
|
|
|
def get_base_info(elem: dict, osm_type: OSM_TYPES, with_name=False) :
|
|
"""
|
|
Extracts base information (coordinates, OSM ID, and optionally a name) from an OSM element.
|
|
|
|
This function retrieves the latitude and longitude coordinates, OSM ID, and optionally the name
|
|
of a given OpenStreetMap (OSM) element. It handles different OSM types (e.g., 'node', 'way') by
|
|
extracting coordinates either directly or from a center tag, depending on the element type.
|
|
|
|
Args:
|
|
elem (dict): The JSON element representing the OSM entity.
|
|
osm_type (str): The type of the OSM entity (e.g., 'node', 'way'). If 'node', the coordinates
|
|
are extracted directly from the element; otherwise, from the 'center' tag.
|
|
with_name (bool): Whether to extract and return the name of the element. If True, it attempts
|
|
to find the 'name' tag within the element and return its value. Defaults to False.
|
|
|
|
Returns:
|
|
tuple: A tuple containing:
|
|
- osm_id (str): The OSM ID of the element.
|
|
- coords (tuple): A tuple of (latitude, longitude) coordinates.
|
|
- name (str, optional): The name of the element if `with_name` is True; otherwise, not included.
|
|
"""
|
|
# 1. extract coordinates
|
|
if osm_type != 'node' :
|
|
center = elem.get('center')
|
|
lat = float(center.get('lat'))
|
|
lon = float(center.get('lon'))
|
|
|
|
else :
|
|
lat = float(elem.get('lat'))
|
|
lon = float(elem.get('lon'))
|
|
|
|
coords = tuple((lat, lon))
|
|
|
|
# 2. Extract OSM id
|
|
osm_id = elem.get('id')
|
|
|
|
# 3. Extract name if specified and return
|
|
if with_name :
|
|
name = elem.get('tags', {}).get('name')
|
|
return osm_id, coords, name
|
|
|
|
return osm_id, coords
|
|
|
|
|
|
def fill_cache():
|
|
"""
|
|
Scans the specified cache directory for files starting with 'hollow_' and attempts to load
|
|
their contents as JSON to fill the cache of the Overpass system.
|
|
"""
|
|
overpass = Overpass()
|
|
|
|
n_files = 0
|
|
total = 0
|
|
|
|
with os.scandir(OSM_CACHE_DIR) as it:
|
|
for entry in it:
|
|
if entry.is_file() and entry.name.startswith('hollow_'):
|
|
total += 1
|
|
try :
|
|
# Read the whole file content as a string
|
|
with open(entry.path, 'r', encoding='utf-8') as f:
|
|
# load data and fill the cache with the query and key
|
|
json_data = json.load(f)
|
|
overpass.fill_cache(json_data)
|
|
n_files += 1
|
|
time.sleep(1)
|
|
# Now delete the file as the cache is filled
|
|
os.remove(entry.path)
|
|
|
|
except Exception as exc :
|
|
overpass.logger.error(f'An error occured while parsing file {entry.path} as .json file: {str(exc)}')
|
|
|
|
overpass.logger.info(f"Successfully filled {n_files}/{total} cache files.")
|