started to implement overpass queries

This commit is contained in:
Kilian Scheidecker 2024-05-30 00:48:38 +02:00
parent 03da8441f2
commit d88f22121e
5 changed files with 87 additions and 51 deletions

View File

@ -1,5 +1,5 @@
from OSMPythonTools.api import Api
from OSMPythonTools.overpass import Overpass
from OSMPythonTools.overpass import Overpass, overpassQueryBuilder, Nominatim
from dataclasses import dataclass
from pydantic import BaseModel
@ -29,6 +29,23 @@ def add_from_id(id: int, score: int) :
return Landmarkkkk(obj.tag('name:fr'), score, id) # create Landmark out of it
def get_sights(city_country: str):
nominatim = Nominatim()
areaId = nominatim.query(city_country).areaId()
overpass = Overpass()
# list of stuff we want to define as sights
l = ["'tourism'='museum'", "'tourism'='attraction'", "'tourism'='gallery'", 'historic', "'amenity'='arts_centre'", "'amenity'='planetarium'", '"amenity"="place_of_worship"']
score = 0
for amenity in l :
query = overpassQueryBuilder(area=areaId, elementType=['way', 'relation'], selector=amenity, includeGeometry=True)
result = overpass.query(query)
score += result.countElements()
return score
# take a lsit of tuples (id, score) to generate a list of landmarks
def generate_landmarks(ids_and_scores: list) :
@ -37,7 +54,7 @@ def generate_landmarks(ids_and_scores: list) :
L.append(add_from_id(tup[0], tup[1]))
return L
"""
api = Api()
@ -54,4 +71,8 @@ landmarks = generate_landmarks(ids_and_scores)
for obj in landmarks :
print(obj)
print(obj)"""
print(get_sights('Paris, France'))

View File

@ -1,22 +1,30 @@
from optimizer import solve_optimization
from .structs.landmarks import LandmarkTest
from .structs.preferences import Preferences
from fastapi import FastAPI
from structs.landmarks import LandmarkTest
from structs.landmarks import Landmark
from structs.preferences import Preferences
from fastapi import FastAPI, Query, Body
from typing import List
app = FastAPI()
# Assuming frontend is calling like this :
#"http://127.0.0.1:8000/process?param1={param1}&param2={param2}"
# This should become main at some point
@app.post("optimizer/{longitude}/{latitude}")
def get_data(longitude: float, latitude: float, preferences: Preferences) :
@app.post("/optimizer/{longitude}/{latitude}")
def main(longitude: float, latitude: float, prefrences: Preferences = Body(...)) -> List[Landmark]:
# From frontend get longitude, latitude and prefence list
return
landmarks = []
@app.get("optimizer/{max_steps}/{print_details}")
def main(max_steps: int, print_details: bool):
return landmarks
@app.get("test")
def test():
# CONSTRAINT TO RESPECT MAX NUMBER OF STEPS
#max_steps = 16
max_steps = 16
# Initialize all landmarks (+ start and goal). Order matters here
@ -30,13 +38,14 @@ def main(max_steps: int, print_details: bool):
landmarks.append(LandmarkTest("arrivée", -1, (0, 0)))
visiting_order = solve_optimization(landmarks, max_steps, print_details)
visiting_order = solve_optimization(landmarks, max_steps, True)
#return visiting_order
return visiting_order
# should return landmarks = the list of Landmark (ordered list)
return("max steps :", max_steps, "\n", visiting_order)
#return("max steps :", max_steps, "\n", visiting_order)
"""if __name__ == "__main__":
"""# keep this for debug
if __name__ == "__main__":
main()"""

View File

@ -1,12 +1,14 @@
from scipy.optimize import linprog
import numpy as np
from scipy.linalg import block_diag
from structs.landmarks import Landmark, LandmarkType
from structs.preferences import Preference, Preferences
# landmarks = [Landmark_1, Landmark_2, ...]
# Convert the solution of the optimization into the list of edges to follow. Order is taken into account
def untangle(resx: list) :
def untangle(resx: list) -> list:
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
n_edges = resx.sum() # number of edges
@ -34,7 +36,7 @@ def untangle(resx: list) :
return order
# Just to print the result
def print_res(res, landmarks: list, P) :
def print_res(res, landmarks: list, P) -> list:
X = abs(res.x)
order = untangle(X)
things = []
@ -63,9 +65,9 @@ def print_res(res, landmarks: list, P) :
# Checks for cases of circular symmetry in the result
def has_circle(resx: list) :
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
n_edges = resx.sum() # number of edges
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
n_edges = resx.sum() # number of edges
nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0]
@ -103,18 +105,17 @@ def has_circle(resx: list) :
return []
# Constraint to not have d14 and d41 simultaneously. Does not prevent circular symmetry with more elements
def break_sym(landmarks, A_ub, b_ub):
L = len(landmarks)
upper_ind = np.triu_indices(L,0,L)
def break_sym(N, A_ub, b_ub):
upper_ind = np.triu_indices(N,0,N)
up_ind_x = upper_ind[0]
up_ind_y = upper_ind[1]
for i, _ in enumerate(up_ind_x) :
l = [0]*L*L
l = [0]*N*N
if up_ind_x[i] != up_ind_y[i] :
l[up_ind_x[i]*L + up_ind_y[i]] = 1
l[up_ind_y[i]*L + up_ind_x[i]] = 1
l[up_ind_x[i]*N + up_ind_y[i]] = 1
l[up_ind_y[i]*N + up_ind_x[i]] = 1
A_ub = np.vstack((A_ub,l))
b_ub.append(1)
@ -126,8 +127,7 @@ def break_sym(landmarks, A_ub, b_ub):
return A_ub, b_ub
# Constraint to not have circular paths. Want to go from start -> finish without unconnected loops
def break_circle(landmarks, A_ub, b_ub, circle) :
N = len(landmarks)
def break_circle(N, A_ub, b_ub, circle) :
l = [0]*N*N
for index in circle :
@ -146,19 +146,18 @@ def break_circle(landmarks, A_ub, b_ub, circle) :
return A_ub, b_ub
# Constraint to respect max number of travels
def respect_number(landmarks, A_ub, b_ub):
def respect_number(N, A_ub, b_ub):
h = []
for i in range(len(landmarks)) : h.append([1]*len(landmarks))
for i in range(N) : h.append([1]*N)
T = block_diag(*h)
"""for l in T :
for i in range(7):
print(l[i*7:i*7+7])
print("\n")"""
return np.vstack((A_ub, T)), b_ub + [1]*len(landmarks)
return np.vstack((A_ub, T)), b_ub + [1]*N
# Constraint to tie the problem together. Necessary but not sufficient to avoid circles
def respect_order(landmarks: list, A_eq, b_eq):
N = len(landmarks)
def respect_order(N: int, A_eq, b_eq):
for i in range(N-1) : # Prevent stacked ones
if i == 0 :
continue
@ -185,16 +184,15 @@ def manhattan_distance(loc1: tuple, loc2: tuple):
return abs(x1 - x2) + abs(y1 - y2)
# Constraint to not stay in position
def init_eq_not_stay(landmarks):
L = len(landmarks)
l = [0]*L*L
def init_eq_not_stay(N: int):
l = [0]*N*N
for i in range(L) :
for j in range(L) :
for i in range(N) :
for j in range(N) :
if j == i :
l[j + i*L] = 1
l[L-1] = 1 # cannot skip from start to finish
l[j + i*N] = 1
l[N-1] = 1 # cannot skip from start to finish
#A_eq = np.array([np.array(xi) for xi in A_eq]) # Must convert A_eq into an np array
l = np.array(np.array(l))
@ -258,19 +256,21 @@ def path_length(P: list, resx: list) :
# Main optimization pipeline
def solve_optimization (landmarks, max_steps, printing_details) :
N = len(landmarks)
# SET CONSTRAINTS FOR INEQUALITY
c, A_ub, b_ub = init_ub_dist(landmarks, max_steps) # Add the distances from each landmark to the other
P = A_ub # store the paths for later. Needed to compute path length
A_ub, b_ub = respect_number(landmarks, A_ub, b_ub) # Respect max number of visits.
A_ub, b_ub = respect_number(N, A_ub, b_ub) # Respect max number of visits.
# TODO : Problems with circular symmetry
A_ub, b_ub = break_sym(landmarks, A_ub, b_ub) # break the symmetry. Only use the upper diagonal values
A_ub, b_ub = break_sym(N, A_ub, b_ub) # break the symmetry. Only use the upper diagonal values
# SET CONSTRAINTS FOR EQUALITY
A_eq, b_eq = init_eq_not_stay(landmarks) # Force solution not to stay in same place
A_eq, b_eq = init_eq_not_stay(N) # Force solution not to stay in same place
A_eq, b_eq, H = respect_user_mustsee(landmarks, A_eq, b_eq) # Check if there are user_defined must_see. Also takes care of start/goal
A_eq, b_eq = respect_order(landmarks, A_eq, b_eq) # Respect order of visit (only works when max_steps is limiting factor)
A_eq, b_eq = respect_order(N, A_eq, b_eq) # Respect order of visit (only works when max_steps is limiting factor)
# Bounds for variables (x can only be 0 or 1)
x_bounds = [(0, 1)] * len(c)

View File

View File

@ -13,12 +13,18 @@ class Landmark(BaseModel) :
type: LandmarkType # De facto mapping depending on how the query was executed with overpass. Should still EXACTLY correspond to the preferences
location : tuple
# loop through the preferences and assign a score to the landmark
def score(self, preferences: Preferences):
def score(preferences: Preferences):
# loop through the preferences and assign a score
return 29
for preference_name, preference in preferences.__dict__.items():
if (preference_name == self.type.landmark_type) :
score = preference.score
if (not score) :
raise Exception(f"Could not determine score for landmark {self.name}")
else :
return score