reviewed code structure, cleaned comments, now pep8 conform
All checks were successful
Build and push docker image / Build (pull_request) Successful in 2m17s
Build and release APK / Build APK (pull_request) Successful in 6m53s
Build web / Build Web (pull_request) Successful in 1m31s

This commit is contained in:
Kilian Scheidecker 2024-06-11 20:14:12 +02:00
parent af4d68f36f
commit 111e6836f6
10 changed files with 423 additions and 500 deletions

View File

@ -0,0 +1,11 @@
'leisure'='park'
geological
'natural'='geyser'
'natural'='hot_spring'
'natural'='arch'
'natural'='volcano'
'natural'='stone'
'tourism'='alpine_hut'
'tourism'='viewpoint'
'tourism'='zoo'
'waterway'='waterfall'

View File

@ -0,0 +1,2 @@
'shop'='department_store'
'shop'='mall'

View File

@ -0,0 +1,9 @@
'tourism'='museum'
'tourism'='attraction'
'tourism'='gallery'
historic
'amenity'='arts_centre'
'amenity'='planetarium'
'amenity'='place_of_worship'
'amenity'='fountain'
'water'='reflecting_pool'

View File

@ -1,21 +1,13 @@
import math as m
import json, os
from OSMPythonTools.api import Api
from typing import List, Tuple
from OSMPythonTools.overpass import Overpass, overpassQueryBuilder, Nominatim
from structs.landmarks import Landmark, LandmarkType
from structs.preferences import Preferences, Preference
from typing import List
from typing import Tuple
BBOX_SIDE = 10 # size of bbox in *km* for general area, 10km
RADIUS_CLOSE_TO = 27.5 # size of area in *m* for close features, 30m radius
MIN_SCORE = 30 # DEPRECIATED. discard elements with score < 30
MIN_TAGS = 5 # DEPRECIATED. discard elements withs less than 5 tags
CHURCH_PENALTY = 0.6 # penalty to reduce score of curches
PARK_COEFF = 1.4 # multiplier for parks
N_IMPORTANT = 40 # take the 30 most important landmarks
SIGHTSEEING = LandmarkType(landmark_type='sightseeing')
NATURE = LandmarkType(landmark_type='nature')
SHOPPING = LandmarkType(landmark_type='shopping')
@ -23,71 +15,64 @@ SHOPPING = LandmarkType(landmark_type='shopping')
# Include the json here
# Create a list of all things to visit given some preferences and a city. Ready for the optimizer
def generate_landmarks(preferences: Preferences, city_country: str = None, coordinates: Tuple[float, float] = None)->Tuple[List[Landmark], List[Landmark]] :
def generate_landmarks(preferences: Preferences, city_country: str = None, coordinates: Tuple[float, float] = None) -> Tuple[List[Landmark], List[Landmark]] :
l_sights = ["'tourism'='museum'", "'tourism'='attraction'", "'tourism'='gallery'", 'historic', "'amenity'='arts_centre'", "'amenity'='planetarium'", "'amenity'='place_of_worship'", "'amenity'='fountain'", '"water"="reflecting_pool"']
l_nature = ["'leisure'='park'", 'geological', "'natural'='geyser'", "'natural'='hot_spring'", '"natural"="arch"', '"natural"="cave_entrance"', '"natural"="volcano"', '"natural"="stone"', '"tourism"="alpine_hut"', '"tourism"="picnic_site"', '"tourism"="viewpoint"', '"tourism"="zoo"', '"waterway"="waterfall"']
l_shop = ["'shop'='department_store'", "'shop'='mall'"] #, '"shop"="collector"', '"shop"="antiques"']
l_sights, l_nature, l_shop = get_amenities()
L = []
# Use 'City, Country'
if city_country is not None :
# List for sightseeing
if preferences.sightseeing.score != 0 :
L1 = get_landmarks_nominatim(city_country, l_sights, SIGHTSEEING)
correct_score(L1, preferences.sightseeing)
L += L1
# List for nature
if preferences.nature.score != 0 :
L2 = get_landmarks_nominatim(city_country, l_nature, NATURE)
correct_score(L2, preferences.nature)
L += L2
# List for shopping
if preferences.shopping.score != 0 :
L3 = get_landmarks_nominatim(city_country, l_shop, SHOPPING)
correct_score(L3, preferences.shopping)
L += L3
# Use coordinates
elif coordinates is not None :
# List for sightseeing
if preferences.sightseeing.score != 0 :
L1 = get_landmarks_coords(coordinates, l_sights, SIGHTSEEING)
correct_score(L1, preferences.sightseeing)
L += L1
# List for nature
if preferences.nature.score != 0 :
L2 = get_landmarks_coords(coordinates, l_nature, NATURE)
correct_score(L2, preferences.nature)
L += L2
# List for shopping
if preferences.shopping.score != 0 :
L3 = get_landmarks_coords(coordinates, l_shop, SHOPPING)
correct_score(L3, preferences.shopping)
L += L3
# List for sightseeing
if preferences.sightseeing.score != 0 :
L1 = get_landmarks(l_sights, SIGHTSEEING, city_country=city_country, coordinates=coordinates)
correct_score(L1, preferences.sightseeing)
L += L1
# List for nature
if preferences.nature.score != 0 :
L2 = get_landmarks(l_nature, NATURE, city_country=city_country, coordinates=coordinates)
correct_score(L2, preferences.nature)
L += L2
# List for shopping
if preferences.shopping.score != 0 :
L3 = get_landmarks(l_shop, SHOPPING, city_country=city_country, coordinates=coordinates)
correct_score(L3, preferences.shopping)
L += L3
return remove_duplicates(L), take_most_important(L)
#return L, cleanup_list(L)
# Determines if two locations are close to each other
def is_close_to(loc1: Tuple[float, float], loc2: Tuple[float, float])->bool :
alpha = (180*RADIUS_CLOSE_TO)/(6371000*m.pi)
if abs(loc1[0] - loc2[0]) + abs(loc1[1] - loc2[1]) < alpha*2 :
return True
else :
return False
# Helper function to gather the amenities list
def get_amenities() -> List[List[str]] :
# Get the list of amenities from the files
sightseeing = get_list('/amenities/sightseeing.am')
nature = get_list('/amenities/nature.am')
shopping = get_list('/amenities/shopping.am')
return sightseeing, nature, shopping
# Helper function to read a .am file and generate the corresponding list
def get_list(path: str) -> List[str] :
with open(os.path.dirname(os.path.abspath(__file__)) + path) as f :
content = f.readlines()
amenities = []
for line in content :
amenities.append(line.strip('\n'))
return amenities
# Take the most important landmarks from the list
def take_most_important(L: List[Landmark])->List[Landmark] :
def take_most_important(L: List[Landmark]) -> List[Landmark] :
# Read the parameters from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/landmarks_manager.params', "r") as f :
parameters = json.loads(f.read())
N_important = parameters['N important']
L_copy = []
L_clean = []
scores = [0]*len(L)
@ -110,11 +95,12 @@ def take_most_important(L: List[Landmark])->List[Landmark] :
for old in L_copy :
if old.name == elem.name :
old.attractiveness = L[t].attractiveness
scores = [0]*len(L_copy)
for i, elem in enumerate(L_copy) :
scores[i] = elem.attractiveness
res = sorted(range(len(scores)), key = lambda sub: scores[sub])[-N_IMPORTANT:]
res = sorted(range(len(scores)), key = lambda sub: scores[sub])[-N_important:]
for i, elem in enumerate(L_copy) :
if i in res :
@ -122,21 +108,23 @@ def take_most_important(L: List[Landmark])->List[Landmark] :
return L_clean
# Remove duplicate elements and elements with low score
def remove_duplicates(L: List[Landmark])->List[Landmark] :
def remove_duplicates(L: List[Landmark]) -> List[Landmark] :
L_clean = []
names = []
for landmark in L :
if landmark.name in names : # Remove duplicates
if landmark.name in names :
continue
else :
names.append(landmark.name)
L_clean.append(landmark)
return L_clean
# Correct the score of a list of landmarks by taking into account preferences and the number of tags
# Correct the score of a list of landmarks by taking into account preference settings
def correct_score(L: List[Landmark], preference: Preference) :
if len(L) == 0 :
@ -148,53 +136,32 @@ def correct_score(L: List[Landmark], preference: Preference) :
for elem in L :
elem.attractiveness = int(elem.attractiveness*preference.score/500) # arbitrary computation
# Correct the score of a list of landmarks by taking into account preferences and the number of tags
def correct_score_test(L: List[Landmark], preference: Preference) :
if len(L) == 0 :
return
if L[0].type != preference.type :
raise TypeError(f"LandmarkType {preference.type} does not match the type of Landmark {L[0].name}")
for elem in L :
elem.attractiveness = int(elem.attractiveness/100) + elem.n_tags # arbitrary correction of the balance score vs number of tags
elem.attractiveness = elem.attractiveness*preference.score # arbitrary computation
# Function to count elements within a 25m radius of a location
def count_elements_within_radius(coordinates: Tuple[float, float]) -> int:
# Function to count elements within a certain radius of a location
def count_elements_within_radius(coordinates: Tuple[float, float], radius: int) -> int:
lat = coordinates[0]
lon = coordinates[1]
alpha = (180*RADIUS_CLOSE_TO)/(6371000*m.pi)
alpha = (180*radius)/(6371000*m.pi)
bbox = {'latLower':lat-alpha,'lonLower':lon-alpha,'latHigher':lat+alpha,'lonHigher': lon+alpha}
overpass = Overpass()
# Build the query to find elements within the radius
radius_query = overpassQueryBuilder(bbox=[bbox['latLower'],bbox['lonLower'],bbox['latHigher'],bbox['lonHigher']],
elementType=['node', 'way', 'relation'])
try :
overpass = Overpass()
radius_result = overpass.query(radius_query)
# The count is the number of elements found
return radius_result.countElements()
except :
return None
# Creates a bounding box around precise coordinates
# Creates a bounding box around given coordinates
def create_bbox(coordinates: Tuple[float, float], side_length: int) -> Tuple[float, float, float, float]:
"""
Create a simple bounding box around given coordinates.
:param coordinates: tuple (lat, lon)
-> lat: Latitude of the center point.
-> lon: Longitude of the center point.
:param side_length: int - side length of the bbox in km
:return: Bounding box as (min_lat, min_lon, max_lat, max_lon).
"""
lat = coordinates[0]
lon = coordinates[1]
@ -214,18 +181,26 @@ def create_bbox(coordinates: Tuple[float, float], side_length: int) -> Tuple[flo
return min_lat, min_lon, max_lat, max_lon
# Generates the list of landmarks for a given Landmarktype. Needs coordinates, a list of amenities and the corresponding LandmarkType
def get_landmarks_coords(coordinates: Tuple[float, float], l: List[Landmark], landmarktype: LandmarkType)->List[Landmark]:
overpass = Overpass()
def get_landmarks_coords(coordinates: Tuple[float, float], list_amenity: list, landmarktype: LandmarkType) -> List[Landmark]:
# Read the parameters from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/landmarks_manager.params', "r") as f :
parameters = json.loads(f.read())
tag_coeff = parameters['tag coeff']
park_coeff = parameters['park coeff']
church_coeff = parameters['church coeff']
radius = parameters['radius close to']
bbox_side = parameters['city bbox side']
# Generate a bbox around current coordinates
bbox = create_bbox(coordinates, BBOX_SIDE)
bbox = create_bbox(coordinates, bbox_side)
# Initialize some variables
overpass = Overpass()
N = 0
L = []
for amenity in l :
for amenity in list_amenity :
query = overpassQueryBuilder(bbox=bbox, elementType=['way', 'relation'], selector=amenity, includeCenter=True, out='body')
result = overpass.query(query)
N += result.countElements()
@ -235,16 +210,15 @@ def get_landmarks_coords(coordinates: Tuple[float, float], l: List[Landmark], la
name = elem.tag('name') # Add name, decode to ASCII
location = (elem.centerLat(), elem.centerLon()) # Add coordinates (lat, lon)
# skip if unprecise location
# Skip if unprecise location
if name is None or location[0] is None:
continue
# skip if unused
# Skip if unused
if 'disused:leisure' in elem.tags().keys():
continue
else :
osm_type = elem.type() # Add type : 'way' or 'relation'
osm_id = elem.id() # Add OSM id
elem_type = landmarktype # Add the landmark type as 'sightseeing
@ -252,12 +226,12 @@ def get_landmarks_coords(coordinates: Tuple[float, float], l: List[Landmark], la
# Add score of given landmark based on the number of surrounding elements. Penalty for churches as there are A LOT
if amenity == "'amenity'='place_of_worship'" :
score = int((count_elements_within_radius(location) + n_tags*100 )*CHURCH_PENALTY)
score = int((count_elements_within_radius(location, radius) + n_tags*tag_coeff )*church_coeff)
elif amenity == "'leisure'='park'" :
score = int((count_elements_within_radius(location) + n_tags*100 )*PARK_COEFF)
score = int((count_elements_within_radius(location, radius) + n_tags*tag_coeff )*park_coeff)
else :
score = count_elements_within_radius(location) + n_tags*100
score = count_elements_within_radius(location, radius) + n_tags*tag_coeff
if score is not None :
# Generate the landmark and append it to the list
landmark = Landmark(name=name, type=elem_type, location=location, osm_type=osm_type, osm_id=osm_id, attractiveness=score, must_do=False, n_tags=n_tags)
@ -265,7 +239,15 @@ def get_landmarks_coords(coordinates: Tuple[float, float], l: List[Landmark], la
return L
def get_landmarks_nominatim(city_country: str, l: List[Landmark], landmarktype: LandmarkType)->List[Landmark] :
def get_landmarks_nominatim(city_country: str, list_amenity: list, landmarktype: LandmarkType) -> List[Landmark] :
# Read the parameters from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/landmarks_manager.params', "r") as f :
parameters = json.loads(f.read())
tag_coeff = parameters['tag coeff']
park_coeff = parameters['park coeff']
church_coeff = parameters['church coeff']
radius = parameters['radius close to']
overpass = Overpass()
nominatim = Nominatim()
@ -275,7 +257,7 @@ def get_landmarks_nominatim(city_country: str, l: List[Landmark], landmarktype:
N = 0
L = []
for amenity in l :
for amenity in list_amenity :
query = overpassQueryBuilder(area=areaId, elementType=['way', 'relation'], selector=amenity, includeCenter=True, out='body')
result = overpass.query(query)
N += result.countElements()
@ -294,18 +276,96 @@ def get_landmarks_nominatim(city_country: str, l: List[Landmark], landmarktype:
continue
else :
osm_type = elem.type() # Add type : 'way' or 'relation'
osm_id = elem.id() # Add OSM id
elem_type = landmarktype # Add the landmark type as 'sightseeing
n_tags = len(elem.tags().keys()) # Add number of tags
# Add score of given landmark based on the number of surrounding elements
# Add score of given landmark based on the number of surrounding elements. Penalty for churches as there are A LOT
if amenity == "'amenity'='place_of_worship'" :
score = int(count_elements_within_radius(location)*CHURCH_PENALTY)
score = int((count_elements_within_radius(location, radius) + n_tags*tag_coeff )*church_coeff)
elif amenity == "'leisure'='park'" :
score = int((count_elements_within_radius(location, radius) + n_tags*tag_coeff )*park_coeff)
else :
score = count_elements_within_radius(location)
score = count_elements_within_radius(location, radius) + n_tags*tag_coeff
if score is not None :
# Generate the landmark and append it to the list
landmark = Landmark(name=name, type=elem_type, location=location, osm_type=osm_type, osm_id=osm_id, attractiveness=score, must_do=False, n_tags=n_tags)
L.append(landmark)
return L
def get_landmarks(list_amenity: list, landmarktype: LandmarkType, city_country: str = None, coordinates: Tuple[float, float] = None) -> List[Landmark] :
if city_country is None and coordinates is None :
raise ValueError("Either one of 'city_country' and 'coordinates' arguments must be specified")
if city_country is not None and coordinates is not None :
raise ValueError("Cannot specify both 'city_country' and 'coordinates' at the same time, please choose either one")
# Read the parameters from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/landmarks_manager.params', "r") as f :
parameters = json.loads(f.read())
tag_coeff = parameters['tag coeff']
park_coeff = parameters['park coeff']
church_coeff = parameters['church coeff']
radius = parameters['radius close to']
bbox_side = parameters['city bbox side']
# If city_country is specified :
if city_country is not None :
nominatim = Nominatim()
areaId = nominatim.query(city_country).areaId()
bbox = None
# If coordinates are specified :
elif coordinates is not None :
bbox = create_bbox(coordinates, bbox_side)
areaId = None
else :
raise ValueError("Argument number is not corresponding.")
# Initialize some variables
N = 0
L = []
overpass = Overpass()
for amenity in list_amenity :
query = overpassQueryBuilder(area=areaId, bbox=bbox, elementType=['way', 'relation'], selector=amenity, includeCenter=True, out='body')
result = overpass.query(query)
N += result.countElements()
for elem in result.elements():
name = elem.tag('name') # Add name
location = (elem.centerLat(), elem.centerLon()) # Add coordinates (lat, lon)
# skip if unprecise location
if name is None or location[0] is None:
continue
# skip if unused
if 'disused:leisure' in elem.tags().keys():
continue
else :
osm_type = elem.type() # Add type : 'way' or 'relation'
osm_id = elem.id() # Add OSM id
elem_type = landmarktype # Add the landmark type as 'sightseeing
n_tags = len(elem.tags().keys()) # Add number of tags
# Add score of given landmark based on the number of surrounding elements. Penalty for churches as there are A LOT
if amenity == "'amenity'='place_of_worship'" :
score = int((count_elements_within_radius(location, radius) + n_tags*tag_coeff )*church_coeff)
elif amenity == "'leisure'='park'" :
score = int((count_elements_within_radius(location, radius) + n_tags*tag_coeff )*park_coeff)
else :
score = count_elements_within_radius(location, radius) + n_tags*tag_coeff
if score is not None :
# Generate the landmark and append it to the list

View File

@ -12,49 +12,36 @@ app = FastAPI()
# Assuming frontend is calling like this :
#"http://127.0.0.1:8000/process?param1={param1}&param2={param2}"
@app.post("/optimizer_coords/{longitude}/{latitude}/{city_country}")
def main1(preferences: Preferences = Body(...), longitude: float = None, latitude: float = None, city_country: str = None) -> List[Landmark]:
@app.post("/optimizer_coords/{latitude}/{longitude}/{city_country}")
def main1(preferences: Preferences = Body(...), latitude: float = None, longitude: float = None, city_country: str = None) -> List[Landmark]:
if preferences is None :
raise ValueError("Please provide preferences in the form of a 'Preference' BaseModel class.")
elif latitude is None and longitude is None and city_country is None :
raise ValueError("Please provide GPS coordinates or a 'city_country' string.")
elif latitude is not None and longitude is not None and city_country is not None :
raise ValueError("Please provide EITHER GPS coordinates or a 'city_country' string.")
# From frontend get longitude, latitude and prefence list
if city_country is None :
coordinates = tuple((latitude, longitude))
# Generate the landmark list
landmarks = generate_landmarks(preferences=preferences, city_country=city_country, coordinates=tuple((longitude, latitude)))
# Set the max distance
max_steps = 90
# Compute the visiting order
visiting_order = solve_optimization(landmarks, max_steps, True)
return visiting_order
@app.get("test")
def test():
# CONSTRAINT TO RESPECT MAX NUMBER OF STEPS
max_steps = 16
[], landmarks_short = generate_landmarks(preferences=preferences, city_country=city_country, coordinates=coordinates)
start = Landmark(name='start', type=LandmarkType(landmark_type='start'), location=(48.8375946, 2.2949904), osm_type='start', osm_id=0, attractiveness=0, must_do=True, n_tags = 0)
finish = Landmark(name='finish', type=LandmarkType(landmark_type='finish'), location=(48.8375946, 2.2949904), osm_type='finish', osm_id=0, attractiveness=0, must_do=True, n_tags = 0)
# Initialize all landmarks (+ start and goal). Order matters here
landmarks = []
landmarks.append(LandmarkTest("départ", -1, (0, 0)))
landmarks.append(LandmarkTest("tour eiffel", 99, (0,2))) # PUT IN JSON
landmarks.append(LandmarkTest("arc de triomphe", 99, (0,4)))
landmarks.append(LandmarkTest("louvre", 99, (0,6)))
landmarks.append(LandmarkTest("montmartre", 99, (0,10)))
landmarks.append(LandmarkTest("concorde", 99, (0,8)))
landmarks.append(LandmarkTest("arrivée", -1, (0, 0)))
landmarks_short.insert(0, start)
landmarks_short.append(finish)
max_walking_time = 4 # hours
visiting_order = solve_optimization(landmarks, max_steps, True)
visiting_list = solve_optimization(landmarks_short, max_walking_time*60, True)
return visiting_order
return visiting_list
# should return landmarks = the list of Landmark (ordered list)
#return("max steps :", max_steps, "\n", visiting_order)
# input city, country in the form of 'Paris, France'

View File

@ -1,85 +1,17 @@
import numpy as np
import json, os
from typing import List
from typing import Tuple
from typing import List, Tuple
from scipy.optimize import linprog
from scipy.linalg import block_diag
from structs.landmarks import Landmark
from math import radians, sin, cos, acos
DETOUR_FACTOR = 1.3 # detour factor for straightline distance
AVG_WALKING_SPEED = 4.8 # average walking speed in km/h
# Function that returns the distance in meters from one location to another
def get_distance(p1: Tuple[float, float], p2: Tuple[float, float]) :
# Compute the straight-line distance in km
if p1 == p2 :
return 0, 0
else:
dist = 6371.01 * acos(sin(radians(p1[0]))*sin(radians(p2[0])) + cos(radians(p1[0]))*cos(radians(p2[0]))*cos(radians(p1[1]) - radians(p2[1])))
# Consider the detour factor for average city
wdist = dist*DETOUR_FACTOR
# Time to walk this distance (in minutes)
wtime = wdist/AVG_WALKING_SPEED*60
if wtime > 15 :
wtime = 5*round(wtime/5)
else :
wtime = round(wtime)
return round(wdist, 1), wtime
# landmarks = [Landmark_1, Landmark_2, ...]
# Convert the solution of the optimization into the list of edges to follow. Order is taken into account
def untangle(resx: list) -> list:
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
n_edges = resx.sum() # number of edges
order = []
nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
indx = nonzero_tup[0].tolist()
indy = nonzero_tup[1].tolist()
vert = (indx[0], indy[0])
order.append(vert[0])
order.append(vert[1])
while len(order) < n_edges + 1 :
ind = indx.index(vert[1])
vert = (indx[ind], indy[ind])
order.append(vert[1])
return order
from structs.landmarks import Landmark
# Just to print the result
def print_res(L: List[Landmark], landmarks: List[Landmark], P) -> list:
# Function to print the result
def print_res(L: List[Landmark], L_tot) -> list:
"""N = int(np.sqrt(len(X)))
for i in range(N):
print(X[i*N:i*N+N])
print("Optimal value:", -res.fun) # Minimization, so we negate to get the maximum
print("Optimal point:", res.x)
for i,x in enumerate(X) : X[i] = round(x,0)
print(order)"""
if len(L) == len(landmarks):
if len(L) == L_tot:
print('\nAll landmarks can be visited within max_steps, the following order is suggested : ')
else :
print('Could not visit all the landmarks, the following order is suggested : ')
@ -92,26 +24,20 @@ def print_res(L: List[Landmark], landmarks: List[Landmark], P) -> list:
else :
print('- ' + elem.name)
#steps = path_length(P, abs(res.x))
print("\nMinutes walked : " + str(dist))
print(f"\nVisited {len(L)} out of {len(landmarks)} landmarks")
return
print(f"Visited {len(L)} out of {L_tot} landmarks")
# prevent the creation of similar circles
def prevent_circle(resx, landmarks: List[Landmark], A_ub, b_ub) -> bool:
# Prevent the use of a particular set of nodes
def prevent_config(resx, A_ub, b_ub) -> bool:
for i, elem in enumerate(resx):
resx[i] = round(elem)
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
n_edges = resx.sum() # number of edges
nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0]
N = len(resx) # Number of edges
L = int(np.sqrt(N)) # Number of landmarks
nonzeroind = np.nonzero(resx)[0] # the return is a little funky so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0].tolist()
@ -130,19 +56,16 @@ def prevent_circle(resx, landmarks: List[Landmark], A_ub, b_ub) -> bool:
return A_ub, b_ub
def break_circle2(circle_vertices, landmarks: List[Landmark], A_ub, b_ub) -> bool:
L = len(landmarks) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
# Prevent the possibility of a given set of vertices
def break_cricle(circle_vertices: list, L: int, A_ub: list, b_ub: list) -> bool:
if L-1 in circle_vertices :
circle_vertices.remove(L-1)
ones = [1]*L
h = [0]*L*L
for i in range(L) :
if i in circle_vertices :
h[i*L:i*L+L] = ones
h[i*L:i*L+L] = [1]*L
A_ub = np.vstack((A_ub, h))
b_ub.append(len(circle_vertices)-1)
@ -150,8 +73,10 @@ def break_circle2(circle_vertices, landmarks: List[Landmark], A_ub, b_ub) -> boo
return A_ub, b_ub
# Checks if the path is connected
def is_connected(resx, landmarks: List[Landmark]) -> bool:
# Checks if the path is connected, returns a circle if it finds one
def is_connected(resx) -> bool:
# first round the results to have only 0-1 values
for i, elem in enumerate(resx):
resx[i] = round(elem)
@ -159,7 +84,6 @@ def is_connected(resx, landmarks: List[Landmark]) -> bool:
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
n_edges = resx.sum() # number of edges
nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
@ -178,12 +102,9 @@ def is_connected(resx, landmarks: List[Landmark]) -> bool:
for i, a in enumerate(ind_a) :
edges.append((a, ind_b[i])) # Create the list of edges
flag = False
remaining = edges
remaining.remove(edge1)
# This can be further optimized
#while len(vertices_visited) < n_edges + 1 :
break_flag = False
while len(remaining) > 0 and not break_flag:
for edge2 in remaining :
@ -192,7 +113,6 @@ def is_connected(resx, landmarks: List[Landmark]) -> bool:
edges_visited.append(edge2)
break_flag = True
break
#continue # continue vs break vs needed at all ?
else :
vertices_visited.append(edge1[1])
edges_visited.append(edge2)
@ -202,171 +122,131 @@ def is_connected(resx, landmarks: List[Landmark]) -> bool:
elif edge1[1] == L-1 or edge1[1] in vertices_visited:
break_flag = True
break
#break
#if flag is True :
# break
vertices_visited.append(edge1[1])
if len(vertices_visited) == n_edges +1 :
flag = True
circle = []
return vertices_visited, []
else:
flag = False
circle = edges_visited
"""j = 0
for i in vertices_visited :
if landmarks[i].name == 'start' :
ordered_visit = vertices_visited[j:] + vertices_visited[:j]
break
j+=1"""
return vertices_visited, edges_visited
return flag, vertices_visited, circle
# Checks for cases of circular symmetry in the result
def has_circle(resx: list) :
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
n_edges = resx.sum() # number of edges
# Function that returns the distance in meters from one location to another
def get_distance(p1: Tuple[float, float], p2: Tuple[float, float], detour: float, speed: float) :
nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0]
# Compute the straight-line distance in km
if p1 == p2 :
return 0, 0
else:
dist = 6371.01 * acos(sin(radians(p1[0]))*sin(radians(p2[0])) + cos(radians(p1[0]))*cos(radians(p2[0]))*cos(radians(p1[1]) - radians(p2[1])))
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
# Consider the detour factor for average city
wdist = dist*detour
indx = nonzero_tup[0].tolist()
indy = nonzero_tup[1].tolist()
# Time to walk this distance (in minutes)
wtime = wdist/speed*60
if wtime > 15 :
wtime = 5*round(wtime/5)
else :
wtime = round(wtime)
return round(wdist, 1), wtime
# Initialize A and c. Compute the distances from all landmarks to each other and store attractiveness
# We want to maximize the sightseeing : max(c) st. A*x < b and A_eq*x = b_eq
def init_ub_dist(landmarks: List[Landmark], max_steps: int):
verts = []
for i, x in enumerate(indx) :
verts.append((x, indy[i]))
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
parameters = json.loads(f.read())
detour = parameters['detour factor']
speed = parameters['average walking speed']
for vert in verts :
visited = []
visited.append(vert)
# Objective function coefficients. a*x1 + b*x2 + c*x3 + ...
c = []
# Coefficients of inequality constraints (left-hand side)
A_ub = []
while len(visited) < n_edges + 1 :
for spot1 in landmarks :
dist_table = [0]*len(landmarks)
c.append(-spot1.attractiveness)
for j, spot2 in enumerate(landmarks) :
t = get_distance(spot1.location, spot2.location, detour, speed)[1]
dist_table[j] = t
A_ub += dist_table
c = c*len(landmarks)
try :
ind = indx.index(vert[1])
return c, A_ub, [max_steps]
vert = (indx[ind], indy[ind])
if vert in visited :
return visited
else :
visited.append(vert)
except :
break
# Constraint to respect max number of travels
def respect_number(L, A_ub, b_ub):
ones = [1]*L
zeros = [0]*L
for i in range(L) :
h = zeros*i + ones + zeros*(L-1-i)
A_ub = np.vstack((A_ub, h))
b_ub.append(1)
return A_ub, b_ub
return []
# Constraint to not have d14 and d41 simultaneously. Does not prevent circular symmetry with more elements
def break_sym(N, A_ub, b_ub):
upper_ind = np.triu_indices(N,0,N)
def break_sym(L, A_ub, b_ub):
upper_ind = np.triu_indices(L,0,L)
up_ind_x = upper_ind[0]
up_ind_y = upper_ind[1]
for i, _ in enumerate(up_ind_x) :
l = [0]*N*N
l = [0]*L*L
if up_ind_x[i] != up_ind_y[i] :
l[up_ind_x[i]*N + up_ind_y[i]] = 1
l[up_ind_y[i]*N + up_ind_x[i]] = 1
l[up_ind_x[i]*L + up_ind_y[i]] = 1
l[up_ind_y[i]*L + up_ind_x[i]] = 1
A_ub = np.vstack((A_ub,l))
b_ub.append(1)
"""for i in range(7):
print(l[i*7:i*7+7])
print("\n")"""
return A_ub, b_ub
# Constraint to not have circular paths. Want to go from start -> finish without unconnected loops
def break_circle(L, A_ub, b_ub, circle) :
l = [0]*L*L
for index in circle :
x = index[0]
y = index[1]
l[x*L+y] = 1
A_ub = np.vstack((A_ub,l))
b_ub.append(len(circle)-1)
"""print("\n\nPREVENT CIRCLE")
for i in range(7):
print(l[i*7:i*7+7])
print("\n")"""
return A_ub, b_ub
# Constraint to respect max number of travels
def respect_number(N, A_ub, b_ub):
"""h = []
for i in range(N) : h.append([1]*N)
T = block_diag(*h)
for l in T :
for i in range(7):
print(l[i*7:i*7+7])
print("\n")"""
#return np.vstack((A_ub, T)), b_ub + [1]*N
ones = [1]*N
zeros = [0]*N
for i in range(N) :
h = zeros*i + ones + zeros*(N-1-i)
A_ub = np.vstack((A_ub, h))
b_ub.append(1)
return A_ub, b_ub
# Constraint to tie the problem together. Necessary but not sufficient to avoid circles
def respect_order(N: int, A_eq, b_eq):
for i in range(N-1) : # Prevent stacked ones
if i == 0 or i == N-1: # Don't touch start or finish
continue
else :
l = [0]*N
l[i] = -1
l = l*N
for j in range(N) :
l[i*N + j] = 1
A_eq = np.vstack((A_eq,l))
b_eq.append(0)
return A_eq, b_eq
# Compute manhattan distance between 2 locations
def manhattan_distance(loc1: tuple, loc2: tuple):
x1, y1 = loc1
x2, y2 = loc2
return abs(x1 - x2) + abs(y1 - y2)
# Constraint to not stay in position. Removes d11, d22, d33, etc.
def init_eq_not_stay(N: int):
l = [0]*N*N
def init_eq_not_stay(L: int):
l = [0]*L*L
for i in range(N) :
for j in range(N) :
for i in range(L) :
for j in range(L) :
if j == i :
l[j + i*N] = 1
l[j + i*L] = 1
l = np.array(np.array(l))
return [l], [0]
# Go through the landmarks and force the optimizer to use landmarks where attractiveness is set to -1
def respect_user_mustsee(landmarks: List[Landmark], A_eq: list, b_eq: list) :
L = len(landmarks)
for i, elem in enumerate(landmarks) :
if elem.must_do is True and elem.name not in ['finish', 'start']:
l = [0]*L*L
for j in range(L) : # sets the horizontal ones (go from)
l[j +i*L] = 1 # sets the vertical ones (go to) double check if good
for k in range(L-1) :
l[k*L+L-1] = 1
A_eq = np.vstack((A_eq,l))
b_eq.append(2)
return A_eq, b_eq
# Constraint to ensure start at start and finish at goal
def respect_start_finish(L: int, A_eq: list, b_eq: list):
ls = [1]*L + [0]*L*(L-1) # sets only horizontal ones for start (go from)
@ -391,144 +271,101 @@ def respect_start_finish(L: int, A_eq: list, b_eq: list):
return A_eq, b_eq
# Initialize A and c. Compute the distances from all landmarks to each other and store attractiveness
# We want to maximize the sightseeing : max(c) st. A*x < b and A_eq*x = b_eq
def init_ub_dist(landmarks: List[Landmark], max_steps: int):
# Objective function coefficients. a*x1 + b*x2 + c*x3 + ...
c = []
# Coefficients of inequality constraints (left-hand side)
A_ub = []
for i, spot1 in enumerate(landmarks) :
dist_table = [0]*len(landmarks)
c.append(-spot1.attractiveness)
for j, spot2 in enumerate(landmarks) :
d, t = get_distance(spot1.location, spot2.location)
dist_table[j] = t
A_ub += dist_table
c = c*len(landmarks)
"""A_ub = []
for line in A :
#print(line)
A_ub += line"""
return c, A_ub, [max_steps]
# Go through the landmarks and force the optimizer to use landmarks where attractiveness is set to -1
def respect_user_mustsee(landmarks: List[Landmark], A_eq: list, b_eq: list) :
L = len(landmarks)
H = 0 # sort of heuristic to get an idea of the number of steps needed
elem_prev = landmarks[0]
for i, elem in enumerate(landmarks) :
if elem.must_do is True and elem.name not in ['finish', 'start']:
l = [0]*L*L
for j in range(L) : # sets the horizontal ones (go from)
l[j +i*L] = 1 # sets the vertical ones (go to) double check if good
for k in range(L-1) :
l[k*L+L-1] = 1
# Constraint to tie the problem together. Necessary but not sufficient to avoid circles
def respect_order(N: int, A_eq, b_eq):
for i in range(N-1) : # Prevent stacked ones
if i == 0 or i == N-1: # Don't touch start or finish
continue
else :
l = [0]*N
l[i] = -1
l = l*N
for j in range(N) :
l[i*N + j] = 1
A_eq = np.vstack((A_eq,l))
b_eq.append(2)
b_eq.append(0)
d, t = get_distance(elem.location, elem_prev.location)
H += t
elem_prev = elem
return A_eq, b_eq
return A_eq, b_eq, H
# Computes the path length given path matrix (dist_table) and a result
def path_length(P: list, resx: list) :
return np.dot(P, resx)
def add_time_to_reach(order: List[Landmark], landmarks: List[Landmark])->List[Landmark] :
j = 0
L = []
# Read the parameters from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
parameters = json.loads(f.read())
detour = parameters['detour factor']
speed = parameters['average walking speed']
prev = landmarks[0]
while(len(L) != len(order)) :
elem = landmarks[order[j]]
if elem != prev :
elem.time_to_reach = get_distance(elem.location, prev.location, detour, speed)[1]
L.append(elem)
prev = elem
j += 1
return L
# Main optimization pipeline
def solve_optimization (landmarks :List[Landmark], max_steps: int, printing_details: bool) :
N = len(landmarks)
L = len(landmarks)
# SET CONSTRAINTS FOR INEQUALITY
c, A_ub, b_ub = init_ub_dist(landmarks, max_steps) # Add the distances from each landmark to the other
P = A_ub # store the paths for later. Needed to compute path length
A_ub, b_ub = respect_number(N, A_ub, b_ub) # Respect max number of visits (no more possible stops than landmarks).
# TODO : Problems with circular symmetry
A_ub, b_ub = break_sym(N, A_ub, b_ub) # break the symmetry. Only use the upper diagonal values
c, A_ub, b_ub = init_ub_dist(landmarks, max_steps) # Add the distances from each landmark to the other
A_ub, b_ub = respect_number(L, A_ub, b_ub) # Respect max number of visits (no more possible stops than landmarks).
A_ub, b_ub = break_sym(L, A_ub, b_ub) # break the 'zig-zag' symmetry
# SET CONSTRAINTS FOR EQUALITY
A_eq, b_eq = init_eq_not_stay(N) # Force solution not to stay in same place
A_eq, b_eq, H = respect_user_mustsee(landmarks, A_eq, b_eq) # Check if there are user_defined must_see. Also takes care of start/goal
A_eq, b_eq = respect_start_finish(N, A_eq, b_eq) # Force start and finish positions
A_eq, b_eq = respect_order(N, A_eq, b_eq) # Respect order of visit (only works when max_steps is limiting factor)
A_eq, b_eq = init_eq_not_stay(L) # Force solution not to stay in same place
A_eq, b_eq = respect_user_mustsee(landmarks, A_eq, b_eq) # Check if there are user_defined must_see. Also takes care of start/goal
A_eq, b_eq = respect_start_finish(L, A_eq, b_eq) # Force start and finish positions
A_eq, b_eq = respect_order(L, A_eq, b_eq) # Respect order of visit (only works when max_steps is limiting factor)
# Bounds for variables (x can only be 0 or 1)
x_bounds = [(0, 1)] * len(c)
# SET BOUNDS FOR DECISION VARIABLE (x can only be 0 or 1)
x_bounds = [(0, 1)]*L*L
# Solve linear programming problem
res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3)
# Raise error if no solution is found
if not res.success :
raise ArithmeticError("No solution could be found, the problem is overconstrained. Please adapt your must_dos")
# Override the max_steps using the heuristic
for i, val in enumerate(b_ub) :
if val == max_steps : b_ub[i] = H
# Solve problem again :
res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3)
if not res.success :
s = "No solution could be found, even when increasing max_steps using the heuristic"
return s
#raise ValueError("No solution could be found, even when increasing max_steps using the heuristic")
# If there is a solution, we're good to go, just check for
# If there is a solution, we're good to go, just check for connectiveness
else :
t, order, circle = is_connected(res.x, landmarks)
order, circle = is_connected(res.x)
i = 0
# Break the circular symmetry if needed
while len(circle) != 0 :
A_ub, b_ub = prevent_circle(res.x, landmarks, A_ub, b_ub)
A_ub, b_ub = break_circle(len(landmarks), A_ub, b_ub, circle)
A_ub, b_ub = break_circle2(order, landmarks, A_ub, b_ub)
timeout = 300
while len(circle) != 0 and i < timeout:
A_ub, b_ub = prevent_config(res.x, A_ub, b_ub)
A_ub, b_ub = break_cricle(order, len(landmarks), A_ub, b_ub)
res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3)
t, order, circle = is_connected(res.x, landmarks)
if t :
order, circle = is_connected(res.x)
if len(circle) == 0 :
# Add the times to reach and stop optimizing
L = add_time_to_reach(order, landmarks)
break
#circle = has_circle(res.x)
print(i)
i += 1
t, order, [] = is_connected(res.x, landmarks)
prev = landmarks[order[0]]
i = 0
L = []
#prev = landmarks[order[i]]
while(len(L) != len(order)) :
elem = landmarks[order[i]]
if elem != prev :
d, t = get_distance(elem.location, prev.location)
elem.time_to_reach = t
L.append(elem)
prev = elem
i += 1
if i == timeout :
raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.")
if printing_details is True :
if i != 0 :
print(f"Neded to recompute paths {i} times because of unconnected loops...")
print_res(L, landmarks, P)
print(np.dot(P, res.x))
print_res(L, len(landmarks))
print("\nTotal score : " + str(int(-res.fun)))
return L

View File

@ -0,0 +1,8 @@
{
"city bbox side" : 10,
"radius close to" : 27.5,
"church coeff" : 0.6,
"park coeff" : 1.4,
"tag coeff" : 100,
"N important" : 30
}

View File

@ -0,0 +1,4 @@
{
"detour factor" : 10,
"average walking speed" : 27.5
}

View File

@ -1,13 +1,9 @@
from typing import Optional
from pydantic import BaseModel
from OSMPythonTools.api import Api
from .landmarktype import LandmarkType
from .preferences import Preferences
class LandmarkTest(BaseModel) :
name : str
attractiveness : int
loc : tuple
from .landmarktype import LandmarkType
# Output to frontend
class Landmark(BaseModel) :

View File

@ -1,11 +1,13 @@
import pandas as pd
from optimizer import solve_optimization
from typing import List
from landmarks_manager import generate_landmarks
from fastapi.encoders import jsonable_encoder
from optimizer import solve_optimization
from structs.landmarks import Landmark
from structs.landmarktype import LandmarkType
from structs.preferences import Preferences, Preference
from fastapi.encoders import jsonable_encoder
from typing import List
# Helper function to create a .txt file with results
@ -37,17 +39,24 @@ def test3(city_country: str) -> List[Landmark]:
type=LandmarkType(landmark_type='shopping'),
score = 5))
coords = None
coordinates = None
landmarks = generate_landmarks(preferences=preferences, city_country=city_country, coordinates=coords)
landmarks, landmarks_short = generate_landmarks(preferences=preferences, city_country=city_country, coordinates=coordinates)
max_steps = 9
#write_data(landmarks)
visiting_order = solve_optimization(landmarks, max_steps, True)
start = Landmark(name='start', type=LandmarkType(landmark_type='start'), location=(48.2044576, 16.3870242), osm_type='start', osm_id=0, attractiveness=0, must_do=True, n_tags = 0)
finish = Landmark(name='finish', type=LandmarkType(landmark_type='finish'), location=(48.2044576, 16.3870242), osm_type='finish', osm_id=0, attractiveness=0, must_do=True, n_tags = 0)
print(len(visiting_order))
test = landmarks_short
test.insert(0, start)
test.append(finish)
return len(visiting_order)
max_walking_time = 2 # hours
visiting_list = solve_optimization(test, max_walking_time*60, True)
def test4(coordinates: tuple[float, float]) -> List[Landmark]:
@ -77,7 +86,6 @@ def test4(coordinates: tuple[float, float]) -> List[Landmark]:
start = Landmark(name='start', type=LandmarkType(landmark_type='start'), location=(48.8375946, 2.2949904), osm_type='start', osm_id=0, attractiveness=0, must_do=True, n_tags = 0)
finish = Landmark(name='finish', type=LandmarkType(landmark_type='finish'), location=(48.8375946, 2.2949904), osm_type='finish', osm_id=0, attractiveness=0, must_do=True, n_tags = 0)
test = landmarks_short
test.insert(0, start)
@ -91,4 +99,5 @@ def test4(coordinates: tuple[float, float]) -> List[Landmark]:
return visiting_list
test4(tuple((48.8795156, 2.3660204)))
test4(tuple((48.8795156, 2.3660204)))
#test3('Vienna, Austria')