style corrections, documentation, duplicate removal, flow improvement

This commit is contained in:
2024-07-25 17:15:18 +02:00
parent 80b3d5b012
commit 2863c99d7c
12 changed files with 314 additions and 348 deletions

View File

@@ -0,0 +1,39 @@
import yaml
from geopy.distance import geodesic
import constants
with constants.OPTIMIZER_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
DETOUR_FACTOR = parameters['detour_factor']
AVERAGE_WALKING_SPEED = parameters['average_walking_speed']
def get_time(p1: tuple[float, float], p2: tuple[float, float]) -> int:
"""
Calculate the time in minutes to travel from one location to another.
Args:
p1 (Tuple[float, float]): Coordinates of the starting location.
p2 (Tuple[float, float]): Coordinates of the destination.
detour (float): Detour factor affecting the distance.
speed (float): Walking speed in kilometers per hour.
Returns:
int: Time to travel from p1 to p2 in minutes.
"""
# Compute the straight-line distance in km
if p1 == p2 :
return 0
else:
dist = geodesic(p1, p2).kilometers
# Consider the detour factor for average cityto deterline walking distance (in km)
walk_dist = dist*DETOUR_FACTOR
# Time to walk this distance (in minutes)
walk_time = walk_dist/AVERAGE_WALKING_SPEED*60
return round(walk_time)

View File

@@ -0,0 +1,365 @@
import math as m
import yaml
import logging
from OSMPythonTools.overpass import Overpass, overpassQueryBuilder
from OSMPythonTools.cachingStrategy import CachingStrategy, JSON
from pywikibot import ItemPage, Site
from pywikibot import config
config.put_throttle = 0
config.maxlag = 0
from structs.preferences import Preferences, Preference
from structs.landmark import Landmark
from .take_most_important import take_most_important
import constants
SIGHTSEEING = 'sightseeing'
NATURE = 'nature'
SHOPPING = 'shopping'
class LandmarkManager:
logger = logging.getLogger(__name__)
city_bbox_side: int # bbox side in meters
radius_close_to: int # radius in meters
church_coeff: float # coeff to adjsut score of churches
park_coeff: float # coeff to adjust score of parks
tag_coeff: float # coeff to adjust weight of tags
N_important: int # number of important landmarks to consider
def __init__(self) -> None:
with constants.AMENITY_SELECTORS_PATH.open('r') as f:
self.amenity_selectors = yaml.safe_load(f)
with constants.LANDMARK_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
self.city_bbox_side = parameters['city_bbox_side']
self.radius_close_to = parameters['radius_close_to']
self.church_coeff = parameters['church_coeff']
self.park_coeff = parameters['park_coeff']
self.tag_coeff = parameters['tag_coeff']
self.N_important = parameters['N_important']
self.overpass = Overpass()
CachingStrategy.use(JSON, cacheDir=constants.OSM_CACHE_DIR)
def generate_landmarks_list(self, center_coordinates: tuple[float, float], preferences: Preferences) -> tuple[list[Landmark], list[Landmark]]:
"""
Generate and prioritize a list of landmarks based on user preferences.
This method fetches landmarks from various categories (sightseeing, nature, shopping) based on the user's preferences
and current location. It scores and corrects these landmarks, removes duplicates, and then selects the most important
landmarks based on a predefined criterion.
Parameters:
center_coordinates (tuple[float, float]): The latitude and longitude of the center location around which to search.
preferences (Preferences): The user's preference settings that influence the landmark selection.
Returns:
tuple[list[Landmark], list[Landmark]]:
- A list of all existing landmarks.
- A list of the most important landmarks based on the user's preferences.
"""
L = []
bbox = self.create_bbox(center_coordinates)
# list for sightseeing
if preferences.sightseeing.score != 0:
score_function = lambda loc, n_tags: int((self.count_elements_close_to(loc) + ((n_tags**1.2)*self.tag_coeff) )*self.church_coeff)
L1 = self.fetch_landmarks(bbox, self.amenity_selectors['sightseeing'], SIGHTSEEING, score_function)
self.correct_score(L1, preferences.sightseeing)
L += L1
# list for nature
if preferences.nature.score != 0:
score_function = lambda loc, n_tags: int((self.count_elements_close_to(loc) + ((n_tags**1.2)*self.tag_coeff) )*self.park_coeff)
L2 = self.fetch_landmarks(bbox, self.amenity_selectors['nature'], NATURE, score_function)
self.correct_score(L2, preferences.nature)
L += L2
# list for shopping
if preferences.shopping.score != 0:
score_function = lambda loc, n_tags: int(self.count_elements_close_to(loc) + ((n_tags**1.2)*self.tag_coeff))
L3 = self.fetch_landmarks(bbox, self.amenity_selectors['shopping'], SHOPPING, score_function)
self.correct_score(L3, preferences.shopping)
L += L3
L = self.remove_duplicates(L)
L_constrained = take_most_important(L, self.N_important)
self.logger.info(f'Generated {len(L)} landmarks around {center_coordinates}, and constrained to {len(L_constrained)} most important ones.')
return L, L_constrained
def remove_duplicates(self, landmarks: list[Landmark]) -> list[Landmark]:
"""
Removes duplicate landmarks based on their names from the given list. Only retains the landmark with highest score
Parameters:
landmarks (list[Landmark]): A list of Landmark objects.
Returns:
list[Landmark]: A list of unique Landmark objects based on their names.
"""
L_clean = []
names = []
for landmark in landmarks:
if landmark.name in names:
continue
else:
names.append(landmark.name)
L_clean.append(landmark)
return L_clean
def correct_score(self, landmarks: list[Landmark], preference: Preference):
"""
Adjust the attractiveness score of each landmark in the list based on user preferences.
This method updates the attractiveness of each landmark by scaling it according to the user's preference score.
The score adjustment is computed using a simple linear transformation based on the preference score.
Args:
landmarks (list[Landmark]): A list of landmarks whose scores need to be corrected.
preference (Preference): The user's preference settings that influence the attractiveness score adjustment.
Raises:
TypeError: If the type of any landmark in the list does not match the expected type in the preference.
"""
if len(landmarks) == 0:
return
if landmarks[0].type != preference.type:
raise TypeError(f"LandmarkType {preference.type} does not match the type of Landmark {landmarks[0].name}")
for elem in landmarks:
elem.attractiveness = int(elem.attractiveness*preference.score/5) # arbitrary computation
def count_elements_close_to(self, coordinates: tuple[float, float]) -> int:
"""
Count the number of OpenStreetMap elements (nodes, ways, relations) within a specified radius of the given location.
This function constructs a bounding box around the specified coordinates based on the radius. It then queries
OpenStreetMap data to count the number of elements within that bounding box.
Args:
coordinates (tuple[float, float]): The latitude and longitude of the location to search around.
Returns:
int: The number of elements (nodes, ways, relations) within the specified radius. Returns 0 if no elements
are found or if an error occurs during the query.
"""
lat = coordinates[0]
lon = coordinates[1]
radius = self.radius_close_to
alpha = (180*radius) / (6371000*m.pi)
bbox = {'latLower':lat-alpha,'lonLower':lon-alpha,'latHigher':lat+alpha,'lonHigher': lon+alpha}
# Build the query to find elements within the radius
radius_query = overpassQueryBuilder(
bbox=[bbox['latLower'],
bbox['lonLower'],
bbox['latHigher'],
bbox['lonHigher']],
elementType=['node', 'way', 'relation']
)
try:
radius_result = self.overpass.query(radius_query)
N_elem = radius_result.countWays() + radius_result.countRelations()
self.logger.debug(f"There are {N_elem} ways/relations within 50m")
if N_elem is None:
return 0
return N_elem
except:
return 0
def create_bbox(self, coordinates: tuple[float, float]) -> tuple[float, float, float, float]:
"""
Create a bounding box around the given coordinates.
Args:
coordinates (tuple[float, float]): The latitude and longitude of the center of the bounding box.
Returns:
tuple[float, float, float, float]: The minimum latitude, minimum longitude, maximum latitude, and maximum longitude
defining the bounding box.
"""
lat = coordinates[0]
lon = coordinates[1]
# Half the side length in km (since it's a square bbox)
half_side_length_km = self.city_bbox_side / 2 / 1000
# Convert distance to degrees
lat_diff = half_side_length_km / 111 # 1 degree latitude is approximately 111 km
lon_diff = half_side_length_km / (111 * m.cos(m.radians(lat))) # Adjust for longitude based on latitude
# Calculate bbox
min_lat = lat - lat_diff
max_lat = lat + lat_diff
min_lon = lon - lon_diff
max_lon = lon + lon_diff
return min_lat, min_lon, max_lat, max_lon
def fetch_landmarks(self, bbox: tuple, amenity_selector: dict, landmarktype: str, score_function: callable) -> list[Landmark]:
"""
Fetches landmarks of a specified type from OpenStreetMap (OSM) within a bounding box centered on given coordinates.
Args:
bbox (tuple[float, float, float, float]): The bounding box coordinates (min_lat, min_lon, max_lat, max_lon).
amenity_selector (dict): The Overpass API query selector for the desired landmark type.
landmarktype (str): The type of the landmark (e.g., 'sightseeing', 'nature', 'shopping').
score_function (callable): The function to compute the score of the landmark based on its attributes.
Returns:
list[Landmark]: A list of Landmark objects that were fetched and filtered based on the provided criteria.
Notes:
- Landmarks are fetched using Overpass API queries.
- Selectors are translated from the dictionary to the Overpass query format. (e.g., 'amenity'='place_of_worship')
- Landmarks are filtered based on various conditions including tags and type.
- Scores are assigned to landmarks based on their attributes and surrounding elements.
"""
return_list = []
# caution, when applying a list of selectors, overpass will search for elements that match ALL selectors simultaneously
# we need to split the selectors into separate queries and merge the results
for sel in dict_to_selector_list(amenity_selector):
self.logger.debug(f"Current selector: {sel}")
query = overpassQueryBuilder(
bbox = bbox,
elementType = ['way', 'relation'],
selector = sel,
# conditions = [],
includeCenter = True,
out = 'body'
)
try:
result = self.overpass.query(query)
except Exception as e:
self.logger.error(f"Error fetching landmarks: {e}")
return
for elem in result.elements():
name = elem.tag('name') # Add name
location = (elem.centerLat(), elem.centerLon()) # Add coordinates (lat, lon)
# TODO: exclude these from the get go
# skip if unprecise location
if name is None or location[0] is None:
continue
# skip if unused
if 'disused:leisure' in elem.tags().keys():
continue
# skip if part of another building
if 'building:part' in elem.tags().keys() and elem.tag('building:part') == 'yes':
continue
osm_type = elem.type() # Add type: 'way' or 'relation'
osm_id = elem.id() # Add OSM id
elem_type = landmarktype # Add the landmark type as 'sightseeing,
n_tags = len(elem.tags().keys()) # Add number of tags
# remove specific tags
skip = False
for tag in elem.tags().keys():
if "pay" in tag:
n_tags -= 1 # discard payment options for tags
if "disused" in tag:
skip = True # skip disused amenities
break
if "wikipedia" in tag:
n_tags += 3 # wikipedia entries count more
if tag == "wikidata":
Q = elem.tag('wikidata')
site = Site("wikidata", "wikidata")
item = ItemPage(site, Q)
item.get()
n_languages = len(item.labels)
n_tags += n_languages/10
if elem_type != "nature":
if "leisure" in tag and elem.tag('leisure') == "park":
elem_type = "nature"
if landmarktype != SHOPPING:
if "shop" in tag:
skip = True
break
if tag == "building" and elem.tag('building') in ['retail', 'supermarket', 'parking']:
skip = True
break
if skip:
continue
score = score_function(location, n_tags)
if score != 0:
# Generate the landmark and append it to the list
landmark = Landmark(
name=name,
type=elem_type,
location=location,
osm_type=osm_type,
osm_id=osm_id,
attractiveness=score,
must_do=False,
n_tags=int(n_tags)
)
return_list.append(landmark)
self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}")
return return_list
def dict_to_selector_list(d: dict) -> list:
"""
Convert a dictionary of key-value pairs to a list of Overpass query strings.
Args:
d (dict): A dictionary of key-value pairs representing the selector.
Returns:
list: A list of strings representing the Overpass query selectors.
"""
return_list = []
for key, value in d.items():
if type(value) == list:
val = '|'.join(value)
return_list.append(f'{key}~"{val}"')
elif type(value) == str and len(value) == 0:
return_list.append(f'{key}')
else:
return_list.append(f'{key}={value}')
return return_list

View File

@@ -0,0 +1,519 @@
import yaml, logging
import numpy as np
from scipy.optimize import linprog
from collections import defaultdict, deque
from geopy.distance import geodesic
from structs.landmark import Landmark
from .get_time_separation import get_time
import constants
class Optimizer:
logger = logging.getLogger(__name__)
detour: int = None # accepted max detour time (in minutes)
detour_factor: float # detour factor of straight line vs real distance in cities
average_walking_speed: float # average walking speed of adult
max_landmarks: int # max number of landmarks to visit
def __init__(self) :
# load parameters from file
with constants.OPTIMIZER_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
self.detour_factor = parameters['detour_factor']
self.average_walking_speed = parameters['average_walking_speed']
self.max_landmarks = parameters['max_landmarks']
# Prevent the use of a particular solution
def prevent_config(self, resx):
"""
Prevent the use of a particular solution by adding constraints to the optimization.
Args:
resx (list[float]): List of edge weights.
Returns:
Tuple[list[int], list[int]]: A tuple containing a new row for constraint matrix and new value for upper bound vector.
"""
for i, elem in enumerate(resx):
resx[i] = round(elem)
N = len(resx) # Number of edges
L = int(np.sqrt(N)) # Number of landmarks
nonzeroind = np.nonzero(resx)[0] # the return is a little funky so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0].tolist()
vertices_visited = ind_a
vertices_visited.remove(0)
ones = [1]*L
h = [0]*N
for i in range(L) :
if i in vertices_visited :
h[i*L:i*L+L] = ones
return h, [len(vertices_visited)-1]
# Prevents the creation of the same circle (both directions)
def prevent_circle(self, circle_vertices: list, L: int) :
"""
Prevent circular paths by by adding constraints to the optimization.
Args:
circle_vertices (list): List of vertices forming a circle.
L (int): Number of landmarks.
Returns:
Tuple[np.ndarray, list[int]]: A tuple containing a new row for constraint matrix and new value for upper bound vector.
"""
l1 = [0]*L*L
l2 = [0]*L*L
for i, node in enumerate(circle_vertices[:-1]) :
next = circle_vertices[i+1]
l1[node*L + next] = 1
l2[next*L + node] = 1
s = circle_vertices[0]
g = circle_vertices[-1]
l1[g*L + s] = 1
l2[s*L + g] = 1
return np.vstack((l1, l2)), [0, 0]
def is_connected(self, resx) :
"""
Determine the order of visits and detect any circular paths in the given configuration.
Args:
resx (list): List of edge weights.
Returns:
Tuple[list[int], Optional[list[list[int]]]]: A tuple containing the visit order and a list of any detected circles.
"""
# first round the results to have only 0-1 values
for i, elem in enumerate(resx):
resx[i] = round(elem)
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0].tolist()
ind_b = nonzero_tup[1].tolist()
# Step 1: Create a graph representation
graph = defaultdict(list)
for a, b in zip(ind_a, ind_b):
graph[a].append(b)
# Step 2: Function to perform BFS/DFS to extract journeys
def get_journey(start):
journey_nodes = []
visited = set()
stack = deque([start])
while stack:
node = stack.pop()
if node not in visited:
visited.add(node)
journey_nodes.append(node)
for neighbor in graph[node]:
if neighbor not in visited:
stack.append(neighbor)
return journey_nodes
# Step 3: Extract all journeys
all_journeys_nodes = []
visited_nodes = set()
for node in ind_a:
if node not in visited_nodes:
journey_nodes = get_journey(node)
all_journeys_nodes.append(journey_nodes)
visited_nodes.update(journey_nodes)
for l in all_journeys_nodes :
if 0 in l :
order = l
all_journeys_nodes.remove(l)
break
if len(all_journeys_nodes) == 0 :
return order, None
return order, all_journeys_nodes
def init_ub_dist(self, landmarks: list[Landmark], max_steps: int):
"""
Initialize the objective function coefficients and inequality constraints for the optimization problem.
This function computes the distances between all landmarks and stores their attractiveness to maximize sightseeing.
The goal is to maximize the objective function subject to the constraints A*x < b and A_eq*x = b_eq.
Args:
landmarks (list[Landmark]): List of landmarks.
max_steps (int): Maximum number of steps allowed.
Returns:
Tuple[list[float], list[float], list[int]]: Objective function coefficients, inequality constraint coefficients, and the right-hand side of the inequality constraint.
"""
# Objective function coefficients. a*x1 + b*x2 + c*x3 + ...
c = []
# Coefficients of inequality constraints (left-hand side)
A_ub = []
for spot1 in landmarks :
dist_table = [0]*len(landmarks)
c.append(-spot1.attractiveness)
for j, spot2 in enumerate(landmarks) :
t = get_time(spot1.location, spot2.location)
dist_table[j] = t
closest = sorted(dist_table)[:22]
for i, dist in enumerate(dist_table) :
if dist not in closest :
dist_table[i] = 32700
A_ub += dist_table
c = c*len(landmarks)
return c, A_ub, [max_steps]
def respect_number(self, L: int):
"""
Generate constraints to ensure each landmark is visited only once and cap the total number of visited landmarks.
Args:
L (int): Number of landmarks.
Returns:
Tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
ones = [1]*L
zeros = [0]*L
A = ones + zeros*(L-1)
b = [1]
for i in range(L-1) :
h_new = zeros*i + ones + zeros*(L-1-i)
A = np.vstack((A, h_new))
b.append(1)
A = np.vstack((A, ones*L))
b.append(self.max_landmarks+1)
return A, b
# Constraint to not have d14 and d41 simultaneously. Does not prevent cyclic paths with more elements
def break_sym(self, L):
"""
Generate constraints to prevent simultaneous travel between two landmarks in both directions.
Args:
L (int): Number of landmarks.
Returns:
Tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
upper_ind = np.triu_indices(L,0,L)
up_ind_x = upper_ind[0]
up_ind_y = upper_ind[1]
A = [0]*L*L
b = [1]
for i, _ in enumerate(up_ind_x[1:]) :
l = [0]*L*L
if up_ind_x[i] != up_ind_y[i] :
l[up_ind_x[i]*L + up_ind_y[i]] = 1
l[up_ind_y[i]*L + up_ind_x[i]] = 1
A = np.vstack((A,l))
b.append(1)
return A, b
def init_eq_not_stay(self, L: int):
"""
Generate constraints to prevent staying in the same position (e.g., removing d11, d22, d33, etc.).
Args:
L (int): Number of landmarks.
Returns:
Tuple[list[np.ndarray], list[int]]: Equality constraint coefficients and the right-hand side of the equality constraints.
"""
l = [0]*L*L
for i in range(L) :
for j in range(L) :
if j == i :
l[j + i*L] = 1
l = np.array(np.array(l), dtype=np.int8)
return [l], [0]
def respect_user_must_do(self, landmarks: list[Landmark]) :
"""
Generate constraints to ensure that landmarks marked as 'must_do' are included in the optimization.
Args:
landmarks (list[Landmark]): List of landmarks, where some are marked as 'must_do'.
Returns:
Tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
L = len(landmarks)
A = [0]*L*L
b = [0]
for i, elem in enumerate(landmarks[1:]) :
if elem.must_do is True and elem.name not in ['finish', 'start']:
l = [0]*L*L
l[i*L:i*L+L] = [1]*L # set mandatory departures from landmarks tagged as 'must_do'
A = np.vstack((A,l))
b.append(1)
return A, b
def respect_user_must_avoid(self, landmarks: list[Landmark]) :
"""
Generate constraints to ensure that landmarks marked as 'must_avoid' are skipped in the optimization.
Args:
landmarks (list[Landmark]): List of landmarks, where some are marked as 'must_avoid'.
Returns:
Tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
L = len(landmarks)
A = [0]*L*L
b = [0]
for i, elem in enumerate(landmarks[1:]) :
if elem.must_avoid is True and elem.name not in ['finish', 'start']:
l = [0]*L*L
l[i*L:i*L+L] = [1]*L
A = np.vstack((A,l))
b.append(0) # prevent departures from landmarks tagged as 'must_do'
return A, b
# Constraint to ensure start at start and finish at goal
def respect_start_finish(self, L: int):
"""
Generate constraints to ensure that the optimization starts at the designated start landmark and finishes at the goal landmark.
Args:
L (int): Number of landmarks.
Returns:
Tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
l_start = [1]*L + [0]*L*(L-1) # sets departures only for start (horizontal ones)
l_start[L-1] = 0 # prevents the jump from start to finish
l_goal = [0]*L*L # sets arrivals only for finish (vertical ones)
l_L = [0]*L*(L-1) + [1]*L # prevents arrivals at start and departures from goal
for k in range(L-1) : # sets only vertical ones for goal (go to)
l_L[k*L] = 1
if k != 0 :
l_goal[k*L+L-1] = 1
A = np.vstack((l_start, l_goal))
b = [1, 1]
A = np.vstack((A,l_L))
b.append(0)
return A, b
def respect_order(self, L: int):
"""
Generate constraints to tie the optimization problem together and prevent stacked ones, although this does not fully prevent circles.
Args:
L (int): Number of landmarks.
Returns:
Tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
A = [0]*L*L
b = [0]
for i in range(L-1) : # Prevent stacked ones
if i == 0 or i == L-1: # Don't touch start or finish
continue
else :
l = [0]*L
l[i] = -1
l = l*L
for j in range(L) :
l[i*L + j] = 1
A = np.vstack((A,l))
b.append(0)
return A, b
def link_list(self, order: list[int], landmarks: list[Landmark])->list[Landmark] :
"""
Compute the time to reach from each landmark to the next and create a list of landmarks with updated travel times.
Args:
order (list[int]): List of indices representing the order of landmarks to visit.
landmarks (list[Landmark]): List of all landmarks.
Returns:
list[Landmark]]: The updated linked list of landmarks with travel times
"""
L = []
j = 0
while j < len(order)-1 :
# get landmarks involved
elem = landmarks[order[j]]
next = landmarks[order[j+1]]
# get attributes
elem.time_to_reach_next = get_time(elem.location, next.location)
elem.must_do = True
elem.location = (round(elem.location[0], 5), round(elem.location[1], 5))
elem.next_uuid = next.uuid
L.append(elem)
j += 1
next.location = (round(next.location[0], 5), round(next.location[1], 5))
next.must_do = True
L.append(next)
return L
# Main optimization pipeline
def solve_optimization(
self,
max_time: int,
landmarks: list[Landmark],
) -> list[Landmark]:
"""
Main optimization pipeline to solve the landmark visiting problem.
This method sets up and solves a linear programming problem with constraints to find an optimal tour of landmarks,
considering user-defined must-visit landmarks, start and finish points, and ensuring no cycles are present.
Args:
max_time (int): Maximum time allowed for the tour in minutes.
landmarks (list[Landmark]): List of landmarks to visit.
Returns:
list[Landmark]: The optimized tour of landmarks with updated travel times, or None if no valid solution is found.
"""
L = len(landmarks)
# SET CONSTRAINTS FOR INEQUALITY
c, A_ub, b_ub = self.init_ub_dist(landmarks, max_time) # Add the distances from each landmark to the other
A, b = self.respect_number(L) # Respect max number of visits (no more possible stops than landmarks).
A_ub = np.vstack((A_ub, A), dtype=np.int16)
b_ub += b
A, b = self.break_sym(L) # break the 'zig-zag' symmetry
A_ub = np.vstack((A_ub, A), dtype=np.int16)
b_ub += b
# SET CONSTRAINTS FOR EQUALITY
A_eq, b_eq = self.init_eq_not_stay(L) # Force solution not to stay in same place
A, b = self.respect_user_must_do(landmarks) # Check if there are user_defined must_see. Also takes care of start/goal
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
A, b = self.respect_user_must_avoid(landmarks) # Check if there are user_defined must_see. Also takes care of start/goal
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
A, b = self.respect_start_finish(L) # Force start and finish positions
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
A, b = self.respect_order(L) # Respect order of visit (only works when max_steps is limiting factor)
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
# SET BOUNDS FOR DECISION VARIABLE (x can only be 0 or 1)
x_bounds = [(0, 1)]*L*L
# Solve linear programming problem
res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3)
# Raise error if no solution is found
if not res.success :
raise ArithmeticError("No solution could be found, the problem is overconstrained. Please adapt your must_dos")
# If there is a solution, we're good to go, just check for connectiveness
order, circles = self.is_connected(res.x)
#nodes, edges = is_connected(res.x)
i = 0
timeout = 80
while circles is not None and i < timeout:
A, b = self.prevent_config(res.x)
A_ub = np.vstack((A_ub, A))
b_ub += b
#A_ub, b_ub = prevent_circle(order, len(landmarks), A_ub, b_ub)
for circle in circles :
A, b = self.prevent_circle(circle, L)
A_eq = np.vstack((A_eq, A))
b_eq += b
res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3)
if not res.success :
raise ArithmeticError("Solving failed because of overconstrained problem")
return None
order, circles = self.is_connected(res.x)
#nodes, edges = is_connected(res.x)
if circles is None :
break
# print(i)
i += 1
if i == timeout :
raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.")
#sort the landmarks in the order of the solution
tour = [landmarks[i] for i in order]
self.logger.debug(f"Re-optimized {i} times, score: {int(-res.fun)}")
return tour

View File

@@ -0,0 +1,340 @@
import yaml, logging
from shapely import buffer, LineString, Point, Polygon, MultiPoint, concave_hull
from math import pi
from structs.landmark import Landmark
from . import take_most_important, get_time_separation
from .optimizer import Optimizer
import constants
class Refiner :
logger = logging.getLogger(__name__)
detour_factor: float # detour factor of straight line vs real distance in cities
detour_corridor_width: float # width of the corridor around the path
average_walking_speed: float # average walking speed of adult
max_landmarks: int # max number of landmarks to visit
optimizer: Optimizer # optimizer object
def __init__(self, optimizer: Optimizer) :
self.optimizer = optimizer
# load parameters from file
with constants.OPTIMIZER_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
self.detour_factor = parameters['detour_factor']
self.detour_corridor_width = parameters['detour_corridor_width']
self.average_walking_speed = parameters['average_walking_speed']
self.max_landmarks = parameters['max_landmarks'] + 4
def create_corridor(self, landmarks: list[Landmark], width: float) :
"""
Create a corridor around the path connecting the landmarks.
Args:
landmarks (list[Landmark]): the landmark path around which to create the corridor
width (float): Width of the corridor in meters.
Returns:
Geometry: A buffered geometry object representing the corridor around the path.
"""
corrected_width = (180*width)/(6371000*pi)
path = self.create_linestring(landmarks)
obj = buffer(path, corrected_width, join_style="mitre", cap_style="square", mitre_limit=2)
return obj
def create_linestring(self, tour: list[Landmark]) -> LineString :
"""
Create a `LineString` object from a tour.
Args:
tour (list[Landmark]): An ordered sequence of landmarks that represents the visiting order.
Returns:
LineString: A `LineString` object representing the path through the landmarks.
"""
points = []
for landmark in tour :
points.append(Point(landmark.location))
return LineString(points)
# Check if some coordinates are in area. Used for the corridor
def is_in_area(self, area: Polygon, coordinates) -> bool :
"""
Check if a given point is within a specified area.
Args:
area (Polygon): The polygon defining the area.
coordinates (tuple[float, float]): The coordinates of the point to check.
Returns:
bool: True if the point is within the area, otherwise False.
"""
point = Point(coordinates)
return point.within(area)
# Function to determine if two landmarks are close to each other
def is_close_to(self, location1: tuple[float], location2: tuple[float]):
"""
Determine if two locations are close to each other by rounding their coordinates to 3 decimal places.
Args:
location1 (tuple[float, float]): The coordinates of the first location.
location2 (tuple[float, float]): The coordinates of the second location.
Returns:
bool: True if the locations are within 0.001 degrees of each other, otherwise False.
"""
absx = abs(location1[0] - location2[0])
absy = abs(location1[1] - location2[1])
return absx < 0.001 and absy < 0.001
#return (round(location1[0], 3), round(location1[1], 3)) == (round(location2[0], 3), round(location2[1], 3))
def rearrange(self, tour: list[Landmark]) -> list[Landmark]:
"""
Rearrange landmarks to group nearby visits together.
This function reorders landmarks so that nearby landmarks are adjacent to each other in the list,
while keeping 'start' and 'finish' landmarks in their original positions.
Args:
tour (list[Landmark]): Ordered list of landmarks to be rearranged.
Returns:
list[Landmark]: The rearranged list of landmarks with grouped nearby visits.
"""
i = 1
while i < len(tour):
j = i+1
while j < len(tour):
if self.is_close_to(tour[i].location, tour[j].location) and tour[i].name not in ['start', 'finish'] and tour[j].name not in ['start', 'finish']:
# If they are not adjacent, move the j-th element to be adjacent to the i-th element
if j != i + 1:
tour.insert(i + 1, tour.pop(j))
break # Move to the next i-th element after rearrangement
j += 1
i += 1
return tour
def find_shortest_path_through_all_landmarks(self, landmarks: list[Landmark]) -> tuple[list[Landmark], Polygon]:
"""
Find the shortest path through all landmarks using a nearest neighbor heuristic.
This function constructs a path that starts from the 'start' landmark, visits all other landmarks in the order
of their proximity, and ends at the 'finish' landmark. It returns both the ordered list of landmarks and a
polygon representing the path.
Args:
landmarks (list[Landmark]): list of all landmarks including 'start' and 'finish'.
Returns:
tuple[list[Landmark], Polygon]: A tuple where the first element is the list of landmarks in the order they
should be visited, and the second element is a `Polygon` representing
the path connecting all landmarks.
"""
# Step 1: Find 'start' and 'finish' landmarks
start_idx = next(i for i, lm in enumerate(landmarks) if lm.type == 'start')
finish_idx = next(i for i, lm in enumerate(landmarks) if lm.type == 'finish')
start_landmark = landmarks[start_idx]
finish_landmark = landmarks[finish_idx]
# Step 2: Create a list of unvisited landmarks excluding 'start' and 'finish'
unvisited_landmarks = [lm for i, lm in enumerate(landmarks) if i not in [start_idx, finish_idx]]
# Step 3: Initialize the path with the 'start' landmark
path = [start_landmark]
coordinates = [landmarks[start_idx].location]
current_landmark = start_landmark
# Step 4: Use nearest neighbor heuristic to visit all landmarks
while unvisited_landmarks:
nearest_landmark = min(unvisited_landmarks, key=lambda lm: get_time_separation.get_time(current_landmark.location, lm.location))
path.append(nearest_landmark)
coordinates.append(nearest_landmark.location)
current_landmark = nearest_landmark
unvisited_landmarks.remove(nearest_landmark)
# Step 5: Finally add the 'finish' landmark to the path
path.append(finish_landmark)
coordinates.append(landmarks[finish_idx].location)
path_poly = Polygon(coordinates)
return path, path_poly
# Returns a list of minor landmarks around the planned path to enhance experience
def get_minor_landmarks(self, all_landmarks: list[Landmark], visited_landmarks: list[Landmark], width: float) -> list[Landmark] :
"""
Identify landmarks within a specified corridor that have not been visited yet.
This function creates a corridor around the path defined by visited landmarks and then finds landmarks that fall
within this corridor. It returns a list of these landmarks, excluding those already visited, sorted by their importance.
Args:
all_landmarks (list[Landmark]): list of all available landmarks.
visited_landmarks (list[Landmark]): list of landmarks that have already been visited.
width (float): Width of the corridor around the visited landmarks.
Returns:
list[Landmark]: list of important landmarks within the corridor that have not been visited yet.
"""
second_order_landmarks = []
visited_names = []
area = self.create_corridor(visited_landmarks, width)
for visited in visited_landmarks :
visited_names.append(visited.name)
for landmark in all_landmarks :
if self.is_in_area(area, landmark.location) and landmark.name not in visited_names:
second_order_landmarks.append(landmark)
return take_most_important.take_most_important(second_order_landmarks, len(visited_landmarks))
# Try fix the shortest path using shapely
def fix_using_polygon(self, tour: list[Landmark])-> list[Landmark] :
"""
Improve the tour path using geometric methods to ensure it follows a more optimal shape.
This function creates a polygon from the given tour and attempts to refine it using a concave hull. It reorders
the landmarks to fit within this refined polygon and adjusts the tour to ensure the 'start' landmark is at the
beginning. It also checks if the final polygon is simple and rearranges the tour if necessary.
Args:
tour (list[Landmark]): list of landmarks representing the current tour path.
Returns:
list[Landmark]: Refined list of landmarks in the order of visit to produce a better tour path.
"""
coords = []
coords_dict = {}
for landmark in tour :
coords.append(landmark.location)
if landmark.name != 'finish' :
coords_dict[landmark.location] = landmark
tour_poly = Polygon(coords)
better_tour_poly = tour_poly.buffer(0)
try :
xs, ys = better_tour_poly.exterior.xy
if len(xs) != len(tour) :
better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish
xs, ys = better_tour_poly.exterior.xy
except :
better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish
xs, ys = better_tour_poly.exterior.xy
# reverse the xs and ys
xs.reverse()
ys.reverse()
better_tour = [] # list of ordered visit
name_index = {} # Maps the name of a landmark to its index in the concave polygon
# Loop through the polygon and generate the better (ordered) tour
for i,x in enumerate(xs[:-1]) :
y = ys[i]
better_tour.append(coords_dict[tuple((x,y))])
name_index[coords_dict[tuple((x,y))].name] = i
# Scroll the list to have start in front again
start_index = name_index['start']
better_tour = better_tour[start_index:] + better_tour[:start_index]
# Append the finish back and correct the time to reach
better_tour.append(tour[-1])
# Rearrange only if polygon still not simple
if not better_tour_poly.is_simple :
better_tour = self.rearrange(better_tour)
return better_tour
def refine_optimization(
self,
all_landmarks: list[Landmark],
base_tour: list[Landmark],
max_time: int,
detour: int
) -> list[Landmark]:
"""
This is the second stage of the optimization. It refines the initial tour path by considering additional minor landmarks and optimizes the path.
This method evaluates the need for further optimization based on the initial tour. If a detour is required
it adds minor landmarks around the initial predicted path and solves a new optimization problem to find a potentially better
tour. It then links the new tour and adjusts it using a nearest neighbor heuristic and polygon-based methods to
ensure a valid path. The final tour is chosen based on the shortest distance.
Args:
all_landmarks (list[Landmark]): The full list of landmarks available for the optimization.
base_tour (list[Landmark]): The initial tour path to be refined.
max_time (int): The maximum time available for the tour in minutes.
detour (int): The maximum detour time allowed for the tour in minutes.
Returns:
list[Landmark]: The refined list of landmarks representing the optimized tour path.
"""
# No need to refine if no detour is taken
if detour == 0:
return base_tour
minor_landmarks = self.get_minor_landmarks(all_landmarks, base_tour, self.detour_corridor_width)
self.logger.info(f"Using {len(minor_landmarks)} minor landmarks around the predicted path")
# full set of visitable landmarks
full_set = base_tour[:-1] + minor_landmarks # create full set of possible landmarks (without finish)
full_set.append(base_tour[-1]) # add finish back
# get a new tour
new_tour = self.optimizer.solve_optimization(
max_time = max_time + detour,
landmarks = full_set
)
if new_tour is None:
self.logger.warning("No solution found for the refined tour. Returning the initial tour.")
new_tour = base_tour
# Find shortest path using the nearest neighbor heuristic
better_tour, better_poly = self.find_shortest_path_through_all_landmarks(new_tour)
# Fix the tour using Polygons if the path looks weird
if base_tour[0].location == base_tour[-1].location and not better_poly.is_valid :
better_tour = self.fix_using_polygon(better_tour)
return better_tour

View File

@@ -0,0 +1,38 @@
from structs.landmark import Landmark
def take_most_important(landmarks: list[Landmark], N_important) -> list[Landmark] :
L = len(landmarks)
L_copy = []
L_clean = []
scores = [0]*len(landmarks)
names = []
name_id = {}
for i, elem in enumerate(landmarks) :
if elem.name not in names :
names.append(elem.name)
name_id[elem.name] = [i]
L_copy.append(elem)
else :
name_id[elem.name] += [i]
scores = []
for j in name_id[elem.name] :
scores.append(L[j].attractiveness)
best_id = max(range(len(scores)), key=scores.__getitem__)
t = name_id[elem.name][best_id]
if t == i :
for old in L_copy :
if old.name == elem.name :
old.attractiveness = L[t].attractiveness
scores = [0]*len(L_copy)
for i, elem in enumerate(L_copy) :
scores[i] = elem.attractiveness
res = sorted(range(len(scores)), key = lambda sub: scores[sub])[-(N_important-L):]
for i, elem in enumerate(L_copy) :
if i in res :
L_clean.append(elem)
return L_clean