Compare commits
29 Commits
d02ba85c31
...
v0.1.1
| Author | SHA1 | Date | |
|---|---|---|---|
| 6f54522b8c | |||
| 080ecd28ae | |||
| 21706ea7e6 | |||
| 83c1533e78 | |||
| 1f4815c991 | |||
| 699737bc40 | |||
| 1240f86d6e | |||
| 2a5023df4b | |||
| 581644a108 | |||
| f48dcf80c2 | |||
| 757773f433 | |||
| 25c2b6b0d1 | |||
| b527318eec | |||
| f2943eb3ad | |||
| 2ac8499dfb | |||
| 4a904c3d3c | |||
| 978cae290b | |||
| bab6cfe74e | |||
| 71abeabbd2 | |||
| f64e60ddf6 | |||
| d6f723bee1 | |||
| a3243431e0 | |||
| 3605408ebb | |||
| 431ae7c670 | |||
| e612a82921 | |||
| 163e10032c | |||
| 06c01837cf | |||
| cd24ee4a67 | |||
| 85c69d5e01 |
@@ -28,7 +28,7 @@ jobs:
|
||||
working-directory: backend
|
||||
|
||||
- name: Run Tests
|
||||
run: pipenv run pytest src --html=report.html --self-contained-html --log-cli-level=INFO
|
||||
run: pipenv run pytest src --html=report.html --self-contained-html --log-cli-level=DEBUG
|
||||
working-directory: backend
|
||||
|
||||
- name: Upload HTML report
|
||||
|
||||
@@ -445,7 +445,9 @@ disable=raw-checker-failed,
|
||||
logging-fstring-interpolation,
|
||||
duplicate-code,
|
||||
relative-beyond-top-level,
|
||||
invalid-name
|
||||
invalid-name,
|
||||
too-many-arguments,
|
||||
too-many-positional-arguments
|
||||
|
||||
# Enable the message, report, category or checker with the given id(s). You can
|
||||
# either give multiple identifier separated by comma (,) or put this option
|
||||
|
||||
@@ -21,7 +21,7 @@ shapely = "*"
|
||||
pymemcache = "*"
|
||||
fastapi-cli = "*"
|
||||
scikit-learn = "*"
|
||||
pyqt6 = "*"
|
||||
loki-logger-handler = "*"
|
||||
pulp = "*"
|
||||
scipy = "*"
|
||||
requests = "*"
|
||||
|
||||
195
backend/Pipfile.lock
generated
195
backend/Pipfile.lock
generated
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"_meta": {
|
||||
"hash": {
|
||||
"sha256": "16f9923498d0e5a9876794c398a589a38bf88c8326863e1c33b74f445d297cd6"
|
||||
"sha256": "63a160ed81e40c9f26a524eb504d13adb73e007a5bf209b6a09b89bd72a1b556"
|
||||
},
|
||||
"pipfile-spec": 6,
|
||||
"requires": {},
|
||||
@@ -30,6 +30,112 @@
|
||||
"markers": "python_version >= '3.9'",
|
||||
"version": "==4.8.0"
|
||||
},
|
||||
"certifi": {
|
||||
"hashes": [
|
||||
"sha256:1275f7a45be9464efc1173084eaa30f866fe2e47d389406136d332ed4967ec56",
|
||||
"sha256:b650d30f370c2b724812bee08008be0c4163b163ddaec3f2546c1caf65f191db"
|
||||
],
|
||||
"markers": "python_version >= '3.6'",
|
||||
"version": "==2024.12.14"
|
||||
},
|
||||
"charset-normalizer": {
|
||||
"hashes": [
|
||||
"sha256:0167ddc8ab6508fe81860a57dd472b2ef4060e8d378f0cc555707126830f2537",
|
||||
"sha256:01732659ba9b5b873fc117534143e4feefecf3b2078b0a6a2e925271bb6f4cfa",
|
||||
"sha256:01ad647cdd609225c5350561d084b42ddf732f4eeefe6e678765636791e78b9a",
|
||||
"sha256:04432ad9479fa40ec0f387795ddad4437a2b50417c69fa275e212933519ff294",
|
||||
"sha256:0907f11d019260cdc3f94fbdb23ff9125f6b5d1039b76003b5b0ac9d6a6c9d5b",
|
||||
"sha256:0924e81d3d5e70f8126529951dac65c1010cdf117bb75eb02dd12339b57749dd",
|
||||
"sha256:09b26ae6b1abf0d27570633b2b078a2a20419c99d66fb2823173d73f188ce601",
|
||||
"sha256:09b5e6733cbd160dcc09589227187e242a30a49ca5cefa5a7edd3f9d19ed53fd",
|
||||
"sha256:0af291f4fe114be0280cdd29d533696a77b5b49cfde5467176ecab32353395c4",
|
||||
"sha256:0f55e69f030f7163dffe9fd0752b32f070566451afe180f99dbeeb81f511ad8d",
|
||||
"sha256:1a2bc9f351a75ef49d664206d51f8e5ede9da246602dc2d2726837620ea034b2",
|
||||
"sha256:22e14b5d70560b8dd51ec22863f370d1e595ac3d024cb8ad7d308b4cd95f8313",
|
||||
"sha256:234ac59ea147c59ee4da87a0c0f098e9c8d169f4dc2a159ef720f1a61bbe27cd",
|
||||
"sha256:2369eea1ee4a7610a860d88f268eb39b95cb588acd7235e02fd5a5601773d4fa",
|
||||
"sha256:237bdbe6159cff53b4f24f397d43c6336c6b0b42affbe857970cefbb620911c8",
|
||||
"sha256:28bf57629c75e810b6ae989f03c0828d64d6b26a5e205535585f96093e405ed1",
|
||||
"sha256:2967f74ad52c3b98de4c3b32e1a44e32975e008a9cd2a8cc8966d6a5218c5cb2",
|
||||
"sha256:2a75d49014d118e4198bcee5ee0a6f25856b29b12dbf7cd012791f8a6cc5c496",
|
||||
"sha256:2bdfe3ac2e1bbe5b59a1a63721eb3b95fc9b6817ae4a46debbb4e11f6232428d",
|
||||
"sha256:2d074908e1aecee37a7635990b2c6d504cd4766c7bc9fc86d63f9c09af3fa11b",
|
||||
"sha256:2fb9bd477fdea8684f78791a6de97a953c51831ee2981f8e4f583ff3b9d9687e",
|
||||
"sha256:311f30128d7d333eebd7896965bfcfbd0065f1716ec92bd5638d7748eb6f936a",
|
||||
"sha256:329ce159e82018d646c7ac45b01a430369d526569ec08516081727a20e9e4af4",
|
||||
"sha256:345b0426edd4e18138d6528aed636de7a9ed169b4aaf9d61a8c19e39d26838ca",
|
||||
"sha256:363e2f92b0f0174b2f8238240a1a30142e3db7b957a5dd5689b0e75fb717cc78",
|
||||
"sha256:3a3bd0dcd373514dcec91c411ddb9632c0d7d92aed7093b8c3bbb6d69ca74408",
|
||||
"sha256:3bed14e9c89dcb10e8f3a29f9ccac4955aebe93c71ae803af79265c9ca5644c5",
|
||||
"sha256:44251f18cd68a75b56585dd00dae26183e102cd5e0f9f1466e6df5da2ed64ea3",
|
||||
"sha256:44ecbf16649486d4aebafeaa7ec4c9fed8b88101f4dd612dcaf65d5e815f837f",
|
||||
"sha256:4532bff1b8421fd0a320463030c7520f56a79c9024a4e88f01c537316019005a",
|
||||
"sha256:49402233c892a461407c512a19435d1ce275543138294f7ef013f0b63d5d3765",
|
||||
"sha256:4c0907b1928a36d5a998d72d64d8eaa7244989f7aaaf947500d3a800c83a3fd6",
|
||||
"sha256:4d86f7aff21ee58f26dcf5ae81a9addbd914115cdebcbb2217e4f0ed8982e146",
|
||||
"sha256:5777ee0881f9499ed0f71cc82cf873d9a0ca8af166dfa0af8ec4e675b7df48e6",
|
||||
"sha256:5df196eb874dae23dcfb968c83d4f8fdccb333330fe1fc278ac5ceeb101003a9",
|
||||
"sha256:619a609aa74ae43d90ed2e89bdd784765de0a25ca761b93e196d938b8fd1dbbd",
|
||||
"sha256:6e27f48bcd0957c6d4cb9d6fa6b61d192d0b13d5ef563e5f2ae35feafc0d179c",
|
||||
"sha256:6ff8a4a60c227ad87030d76e99cd1698345d4491638dfa6673027c48b3cd395f",
|
||||
"sha256:73d94b58ec7fecbc7366247d3b0b10a21681004153238750bb67bd9012414545",
|
||||
"sha256:7461baadb4dc00fd9e0acbe254e3d7d2112e7f92ced2adc96e54ef6501c5f176",
|
||||
"sha256:75832c08354f595c760a804588b9357d34ec00ba1c940c15e31e96d902093770",
|
||||
"sha256:7709f51f5f7c853f0fb938bcd3bc59cdfdc5203635ffd18bf354f6967ea0f824",
|
||||
"sha256:78baa6d91634dfb69ec52a463534bc0df05dbd546209b79a3880a34487f4b84f",
|
||||
"sha256:7974a0b5ecd505609e3b19742b60cee7aa2aa2fb3151bc917e6e2646d7667dcf",
|
||||
"sha256:7a4f97a081603d2050bfaffdefa5b02a9ec823f8348a572e39032caa8404a487",
|
||||
"sha256:7b1bef6280950ee6c177b326508f86cad7ad4dff12454483b51d8b7d673a2c5d",
|
||||
"sha256:7d053096f67cd1241601111b698f5cad775f97ab25d81567d3f59219b5f1adbd",
|
||||
"sha256:804a4d582ba6e5b747c625bf1255e6b1507465494a40a2130978bda7b932c90b",
|
||||
"sha256:807f52c1f798eef6cf26beb819eeb8819b1622ddfeef9d0977a8502d4db6d534",
|
||||
"sha256:80ed5e856eb7f30115aaf94e4a08114ccc8813e6ed1b5efa74f9f82e8509858f",
|
||||
"sha256:8417cb1f36cc0bc7eaba8ccb0e04d55f0ee52df06df3ad55259b9a323555fc8b",
|
||||
"sha256:8436c508b408b82d87dc5f62496973a1805cd46727c34440b0d29d8a2f50a6c9",
|
||||
"sha256:89149166622f4db9b4b6a449256291dc87a99ee53151c74cbd82a53c8c2f6ccd",
|
||||
"sha256:8bfa33f4f2672964266e940dd22a195989ba31669bd84629f05fab3ef4e2d125",
|
||||
"sha256:8c60ca7339acd497a55b0ea5d506b2a2612afb2826560416f6894e8b5770d4a9",
|
||||
"sha256:91b36a978b5ae0ee86c394f5a54d6ef44db1de0815eb43de826d41d21e4af3de",
|
||||
"sha256:955f8851919303c92343d2f66165294848d57e9bba6cf6e3625485a70a038d11",
|
||||
"sha256:97f68b8d6831127e4787ad15e6757232e14e12060bec17091b85eb1486b91d8d",
|
||||
"sha256:9b23ca7ef998bc739bf6ffc077c2116917eabcc901f88da1b9856b210ef63f35",
|
||||
"sha256:9f0b8b1c6d84c8034a44893aba5e767bf9c7a211e313a9605d9c617d7083829f",
|
||||
"sha256:aabfa34badd18f1da5ec1bc2715cadc8dca465868a4e73a0173466b688f29dda",
|
||||
"sha256:ab36c8eb7e454e34e60eb55ca5d241a5d18b2c6244f6827a30e451c42410b5f7",
|
||||
"sha256:b010a7a4fd316c3c484d482922d13044979e78d1861f0e0650423144c616a46a",
|
||||
"sha256:b1ac5992a838106edb89654e0aebfc24f5848ae2547d22c2c3f66454daa11971",
|
||||
"sha256:b7b2d86dd06bfc2ade3312a83a5c364c7ec2e3498f8734282c6c3d4b07b346b8",
|
||||
"sha256:b97e690a2118911e39b4042088092771b4ae3fc3aa86518f84b8cf6888dbdb41",
|
||||
"sha256:bc2722592d8998c870fa4e290c2eec2c1569b87fe58618e67d38b4665dfa680d",
|
||||
"sha256:c0429126cf75e16c4f0ad00ee0eae4242dc652290f940152ca8c75c3a4b6ee8f",
|
||||
"sha256:c30197aa96e8eed02200a83fba2657b4c3acd0f0aa4bdc9f6c1af8e8962e0757",
|
||||
"sha256:c4c3e6da02df6fa1410a7680bd3f63d4f710232d3139089536310d027950696a",
|
||||
"sha256:c75cb2a3e389853835e84a2d8fb2b81a10645b503eca9bcb98df6b5a43eb8886",
|
||||
"sha256:c96836c97b1238e9c9e3fe90844c947d5afbf4f4c92762679acfe19927d81d77",
|
||||
"sha256:d7f50a1f8c450f3925cb367d011448c39239bb3eb4117c36a6d354794de4ce76",
|
||||
"sha256:d973f03c0cb71c5ed99037b870f2be986c3c05e63622c017ea9816881d2dd247",
|
||||
"sha256:d98b1668f06378c6dbefec3b92299716b931cd4e6061f3c875a71ced1780ab85",
|
||||
"sha256:d9c3cdf5390dcd29aa8056d13e8e99526cda0305acc038b96b30352aff5ff2bb",
|
||||
"sha256:dad3e487649f498dd991eeb901125411559b22e8d7ab25d3aeb1af367df5efd7",
|
||||
"sha256:dccbe65bd2f7f7ec22c4ff99ed56faa1e9f785482b9bbd7c717e26fd723a1d1e",
|
||||
"sha256:dd78cfcda14a1ef52584dbb008f7ac81c1328c0f58184bf9a84c49c605002da6",
|
||||
"sha256:e218488cd232553829be0664c2292d3af2eeeb94b32bea483cf79ac6a694e037",
|
||||
"sha256:e358e64305fe12299a08e08978f51fc21fac060dcfcddd95453eabe5b93ed0e1",
|
||||
"sha256:ea0d8d539afa5eb2728aa1932a988a9a7af94f18582ffae4bc10b3fbdad0626e",
|
||||
"sha256:eab677309cdb30d047996b36d34caeda1dc91149e4fdca0b1a039b3f79d9a807",
|
||||
"sha256:eb8178fe3dba6450a3e024e95ac49ed3400e506fd4e9e5c32d30adda88cbd407",
|
||||
"sha256:ecddf25bee22fe4fe3737a399d0d177d72bc22be6913acfab364b40bce1ba83c",
|
||||
"sha256:eea6ee1db730b3483adf394ea72f808b6e18cf3cb6454b4d86e04fa8c4327a12",
|
||||
"sha256:f08ff5e948271dc7e18a35641d2f11a4cd8dfd5634f55228b691e62b37125eb3",
|
||||
"sha256:f30bf9fd9be89ecb2360c7d94a711f00c09b976258846efe40db3d05828e8089",
|
||||
"sha256:fa88b843d6e211393a37219e6a1c1df99d35e8fd90446f1118f4216e307e48cd",
|
||||
"sha256:fc54db6c8593ef7d4b2a331b58653356cf04f67c960f584edb7c3d8c97e8f39e",
|
||||
"sha256:fd4ec41f914fa74ad1b8304bbc634b3de73d2a0889bd32076342a573e0779e00",
|
||||
"sha256:ffc9202a29ab3920fa812879e95a9e78b2465fd10be7fcbd042899695d75e616"
|
||||
],
|
||||
"markers": "python_version >= '3.7'",
|
||||
"version": "==3.4.1"
|
||||
},
|
||||
"click": {
|
||||
"hashes": [
|
||||
"sha256:63c132bbbed01578a06712a2d1f497bb62d9c1c0d329b7903a866228027263b2",
|
||||
@@ -226,12 +332,12 @@
|
||||
},
|
||||
"pydantic": {
|
||||
"hashes": [
|
||||
"sha256:278b38dbbaec562011d659ee05f63346951b3a248a6f3642e1bc68894ea2b4ff",
|
||||
"sha256:4dd4e322dbe55472cb7ca7e73f4b63574eecccf2835ffa2af9021ce113c83c53"
|
||||
"sha256:427d664bf0b8a2b34ff5dd0f5a18df00591adcee7198fbd71981054cef37b584",
|
||||
"sha256:ca5daa827cce33de7a42be142548b0096bf05a7e7b365aebfa5f8eeec7128236"
|
||||
],
|
||||
"index": "pypi",
|
||||
"markers": "python_version >= '3.8'",
|
||||
"version": "==2.10.5"
|
||||
"version": "==2.10.6"
|
||||
},
|
||||
"pydantic-core": {
|
||||
"hashes": [
|
||||
@@ -356,64 +462,6 @@
|
||||
"markers": "python_version >= '3.7'",
|
||||
"version": "==4.0.0"
|
||||
},
|
||||
"pyqt6": {
|
||||
"hashes": [
|
||||
"sha256:3a4354816f11e812b727206a9ea6e79ff3774f1bb7228ad4b9318442d2c64ff9",
|
||||
"sha256:452bae5840077bf0f146c798d7777f70d7bdd0c7dcfa9ee7a415c1daf2d10038",
|
||||
"sha256:48bace7b87676bba5e6114482f3a20ca20be90c7f261b5d340464313f5f2bf5e",
|
||||
"sha256:6d8628de4c2a050f0b74462e4c9cb97f839bf6ffabbca91711722ffb281570d9",
|
||||
"sha256:8c5c05f5fdff31a5887dbc29b27615b09df467631238d7b449283809ffca6228",
|
||||
"sha256:a9913d479f1ffee804bf7f232079baea4fb4b221a8f4890117588917a54ea30d",
|
||||
"sha256:cf7123caea14e7ecf10bd12cae48e8d9970ef7caf627bc7d7988b0baa209adb3"
|
||||
],
|
||||
"index": "pypi",
|
||||
"markers": "python_version >= '3.9'",
|
||||
"version": "==6.8.0"
|
||||
},
|
||||
"pyqt6-qt6": {
|
||||
"hashes": [
|
||||
"sha256:006d786693d0511fbcf184a862edbd339c6ed1bb3bd9de363d73a19ed4b23dff",
|
||||
"sha256:08065d595f1e6fc2dde9f4450eeff89082f4bad26f600a8e9b9cc5966716bfcf",
|
||||
"sha256:1eb8460a1fdb38d0b2458c2974c01d471c1e59e4eb19ea63fc447aaba3ad530e",
|
||||
"sha256:20843cb86bd94942d1cd99e39bf1aeabb875b241a35a8ab273e4bbbfa63776db",
|
||||
"sha256:2f4b8b55b1414b93f340f22e8c88d25550efcdebc4b65a3927dd947b73bd4358",
|
||||
"sha256:98aa99fe38ae68c5318284cd28f3479ba538c40bf6ece293980abae0925c1b24",
|
||||
"sha256:9f3790c4ce4dc576e48b8718d55fb8743057e6cbd53a6ca1dd253ffbac9b7287",
|
||||
"sha256:a8bc2ed4ee5e7c6ff4dd1c7db0b27705d151fee5dc232bbd1bf17618f937f515",
|
||||
"sha256:d6ca5d2b9d2ec0ee4a814b2175f641a5c4299cb80b45e0f5f8356632663f89b3"
|
||||
],
|
||||
"version": "==6.8.1"
|
||||
},
|
||||
"pyqt6-sip": {
|
||||
"hashes": [
|
||||
"sha256:14f95c6352e3b85dc26bf59cfbf77a470ecbd5fcdcf00af4b648f0e1b9eefb9e",
|
||||
"sha256:15be741d1ae8c82bb7afe9a61f3cf8c50457f7d61229a1c39c24cd6e8f4d86dc",
|
||||
"sha256:1d322ded1d1fea339cc6ac65b768e72c69c486eebb7db6ccde061b5786d74cc5",
|
||||
"sha256:1ec52e962f54137a19208b6e95b6bd9f7a403eb25d7237768a99306cd9db26d1",
|
||||
"sha256:1fb405615970e85b622b13b4cad140ff1e4182eb8334a0b27a4698e6217b89b0",
|
||||
"sha256:22d66256b800f552ade51a463510bf905f3cb318aae00ff4288fae4de5d0e600",
|
||||
"sha256:2ab85aaf155828331399c59ebdd4d3b0358e42c08250e86b43d56d9873df148a",
|
||||
"sha256:3c269052c770c09b61fce2f2f9ea934a67dfc65f443d59629b4ccc8f89751890",
|
||||
"sha256:5004514b08b045ad76425cf3618187091a668d972b017677b1b4b193379ef553",
|
||||
"sha256:552ff8fdc41f5769d3eccc661f022ed496f55f6e0a214c20aaf56e56385d61b6",
|
||||
"sha256:5643c92424fe62cb0b33378fef3d28c1525f91ada79e8a15bd9a05414a09503d",
|
||||
"sha256:56ce0afb19cd8a8c63ff93ae506dffb74f844b88adaa6673ebc0dec43af48a76",
|
||||
"sha256:57b5312ef13c1766bdf69b317041140b184eb24a51e1e23ce8fc5386ba8dffb2",
|
||||
"sha256:5d7726556d1ca7a7ed78e19ba53285b64a2a8f6ad7ff4cb18a1832efca1a3102",
|
||||
"sha256:69a879cfc94f4984d180321b76f52923861cd5bf4969aa885eef7591ee932517",
|
||||
"sha256:6e6c1e2592187934f4e790c0c099d0033e986dcef7bdd3c06e3895ffa995e9fc",
|
||||
"sha256:8b2ac36d6e04db6099614b9c1178a2f87788c7ffc3826571fb63d36ddb4c401d",
|
||||
"sha256:8c207528992d59b0801458aa6fcff118e5c099608ef0fc6ff8bccbdc23f29c04",
|
||||
"sha256:976c7758f668806d4df7a8853f390ac123d5d1f73591ed368bdb8963574ff589",
|
||||
"sha256:accab6974b2758296400120fdcc9d1f37785b2ea2591f00656e1776f058ded6c",
|
||||
"sha256:c1942e107b0243ced9e510d507e0f27aeea9d6b13e0a1b7c06fd52a62e0d41f7",
|
||||
"sha256:c800db3464481e87b1d2b84523b075df1e8fc7856c6f9623dc243f89be1cb604",
|
||||
"sha256:e996d320744ca8342cad6f9454345330d4f06bce129812d032bda3bad6967c5c",
|
||||
"sha256:fa27b51ae4c7013b3700cf0ecf46907d1333ae396fc6511311920485cbce094b"
|
||||
],
|
||||
"markers": "python_version >= '3.9'",
|
||||
"version": "==13.9.1"
|
||||
},
|
||||
"python-dotenv": {
|
||||
"hashes": [
|
||||
"sha256:e324ee90a023d808f1959c46bcbc04446a10ced277783dc6ee09987c37ec10ca",
|
||||
@@ -479,6 +527,15 @@
|
||||
],
|
||||
"version": "==6.0.2"
|
||||
},
|
||||
"requests": {
|
||||
"hashes": [
|
||||
"sha256:55365417734eb18255590a9ff9eb97e9e1da868d4ccd6402399eaf68af20a760",
|
||||
"sha256:70761cfe03c773ceb22aa2f671b4757976145175cdfca038c02654d061d6dcc6"
|
||||
],
|
||||
"index": "pypi",
|
||||
"markers": "python_version >= '3.8'",
|
||||
"version": "==2.32.3"
|
||||
},
|
||||
"rich": {
|
||||
"hashes": [
|
||||
"sha256:439594978a49a09530cff7ebc4b5c7103ef57baf48d5ea3184f21d9a2befa098",
|
||||
@@ -646,11 +703,11 @@
|
||||
},
|
||||
"starlette": {
|
||||
"hashes": [
|
||||
"sha256:4daec3356fb0cb1e723a5235e5beaf375d2259af27532958e2d79df549dad9da",
|
||||
"sha256:bba1831d15ae5212b22feab2f218bab6ed3cd0fc2dc1d4442443bb1ee52260e0"
|
||||
"sha256:2cbcba2a75806f8a41c722141486f37c28e30a0921c5f6fe4346cb0dcee1302f",
|
||||
"sha256:dfb6d332576f136ec740296c7e8bb8c8a7125044e7c6da30744718880cdd059d"
|
||||
],
|
||||
"markers": "python_version >= '3.9'",
|
||||
"version": "==0.45.2"
|
||||
"version": "==0.45.3"
|
||||
},
|
||||
"threadpoolctl": {
|
||||
"hashes": [
|
||||
@@ -676,6 +733,14 @@
|
||||
"markers": "python_version >= '3.8'",
|
||||
"version": "==4.12.2"
|
||||
},
|
||||
"urllib3": {
|
||||
"hashes": [
|
||||
"sha256:1cee9ad369867bfdbbb48b7dd50374c0967a0bb7710050facf0dd6911440e3df",
|
||||
"sha256:f8c5449b3cf0861679ce7e0503c7b44b5ec981bec0d1d3795a07f1ba96f0204d"
|
||||
],
|
||||
"markers": "python_version >= '3.9'",
|
||||
"version": "==2.3.0"
|
||||
},
|
||||
"uvicorn": {
|
||||
"extras": [
|
||||
"standard"
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -2,6 +2,7 @@
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List, Literal, Tuple
|
||||
|
||||
|
||||
LOCATION_PREFIX = Path('src')
|
||||
@@ -14,6 +15,8 @@ OPTIMIZER_PARAMETERS_PATH = PARAMETERS_DIR / 'optimizer_parameters.yaml'
|
||||
cache_dir_string = os.getenv('OSM_CACHE_DIR', './cache')
|
||||
OSM_CACHE_DIR = Path(cache_dir_string)
|
||||
|
||||
OSM_TYPES = List[Literal['way', 'node', 'relation']]
|
||||
BBOX = Tuple[float, float, float, float]
|
||||
|
||||
MEMCACHED_HOST_PATH = os.getenv('MEMCACHED_HOST_PATH', None)
|
||||
if MEMCACHED_HOST_PATH == "none":
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
import logging
|
||||
import time
|
||||
from contextlib import asynccontextmanager
|
||||
from fastapi import FastAPI, HTTPException, Query
|
||||
from fastapi import FastAPI, HTTPException, BackgroundTasks, Query
|
||||
|
||||
from .logging_config import configure_logging
|
||||
from .structs.landmark import Landmark, Toilets
|
||||
@@ -14,8 +14,10 @@ from .utils.landmarks_manager import LandmarkManager
|
||||
from .utils.toilets_manager import ToiletsManager
|
||||
from .optimization.optimizer import Optimizer
|
||||
from .optimization.refiner import Refiner
|
||||
from .overpass.overpass import fill_cache
|
||||
from .cache import client as cache_client
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
manager = LandmarkManager()
|
||||
@@ -35,11 +37,11 @@ async def lifespan(app: FastAPI):
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
|
||||
|
||||
|
||||
@app.post("/trip/new")
|
||||
def new_trip(preferences: Preferences,
|
||||
start: tuple[float, float],
|
||||
end: tuple[float, float] | None = None) -> Trip:
|
||||
end: tuple[float, float] | None = None,
|
||||
background_tasks: BackgroundTasks = None) -> Trip:
|
||||
"""
|
||||
Main function to call the optimizer.
|
||||
|
||||
@@ -70,6 +72,7 @@ def new_trip(preferences: Preferences,
|
||||
osm_type='start',
|
||||
osm_id=0,
|
||||
attractiveness=0,
|
||||
duration=0,
|
||||
must_do=True,
|
||||
n_tags = 0)
|
||||
|
||||
@@ -79,6 +82,7 @@ def new_trip(preferences: Preferences,
|
||||
osm_type='end',
|
||||
osm_id=0,
|
||||
attractiveness=0,
|
||||
duration=0,
|
||||
must_do=True,
|
||||
n_tags=0)
|
||||
|
||||
@@ -89,6 +93,9 @@ def new_trip(preferences: Preferences,
|
||||
preferences = preferences
|
||||
)
|
||||
|
||||
if len(landmarks) == 0 :
|
||||
raise HTTPException(status_code=500, detail="No landmarks were found.")
|
||||
|
||||
# insert start and finish to the landmarks list
|
||||
landmarks_short.insert(0, start_landmark)
|
||||
landmarks_short.append(end_landmark)
|
||||
@@ -112,6 +119,9 @@ def new_trip(preferences: Preferences,
|
||||
refined_tour = refiner.refine_optimization(landmarks, base_tour,
|
||||
preferences.max_time_minute,
|
||||
preferences.detour_tolerance_minute)
|
||||
except TimeoutError as te :
|
||||
logger.error(f'Refiner failed : {str(te)} Using base tour.')
|
||||
refined_tour = base_tour
|
||||
except Exception as exc :
|
||||
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(exc)}") from exc
|
||||
|
||||
@@ -125,6 +135,9 @@ def new_trip(preferences: Preferences,
|
||||
# upon creation of the trip, persistence of both the trip and its landmarks is ensured.
|
||||
trip = Trip.from_linked_landmarks(linked_tour, cache_client)
|
||||
logger.info(f'Generated a trip of {trip.total_time} minutes with {len(refined_tour)} landmarks in {round(t_generate_landmarks + t_first_stage + t_second_stage,3)} seconds.')
|
||||
|
||||
background_tasks.add_task(fill_cache)
|
||||
|
||||
return trip
|
||||
|
||||
|
||||
|
||||
@@ -55,6 +55,9 @@ class Optimizer:
|
||||
self.average_walking_speed = parameters['average_walking_speed']
|
||||
self.max_landmarks = parameters['max_landmarks']
|
||||
self.overshoot = parameters['overshoot']
|
||||
self.time_limit = parameters['time_limit']
|
||||
self.gap_rel = parameters['gap_rel']
|
||||
self.max_iter = parameters['max_iter']
|
||||
|
||||
|
||||
def init_ub_time(self, prob: pl.LpProblem, x: pl.LpVariable, L: int, landmarks: list[Landmark], max_time: int):
|
||||
@@ -87,7 +90,7 @@ class Optimizer:
|
||||
|
||||
# inequality matrix and vector
|
||||
A_ub = np.zeros(L*L, dtype=np.int16)
|
||||
b_ub = round(max_time*self.overshoot)
|
||||
b_ub = round(max_time*(1.1+max_time*self.overshoot))
|
||||
|
||||
for i, spot1 in enumerate(landmarks) :
|
||||
c[i] = spot1.attractiveness
|
||||
@@ -489,6 +492,28 @@ class Optimizer:
|
||||
return L
|
||||
|
||||
|
||||
def warm_start(self, x: list[pl.LpVariable], L: int) :
|
||||
"""
|
||||
This function sets the initial values of the decision variables to a feasible solution.
|
||||
This can help the solver start with a feasible or heuristic solution,
|
||||
potentially speeding up convergence.
|
||||
|
||||
Args:
|
||||
x (list[pl.LpVariable]): A list of PuLP decision variables (binary variables).
|
||||
L (int): The size parameter, representing a dimension (likely related to a grid or matrix).
|
||||
|
||||
Returns:
|
||||
list[pl.LpVariable]: The modified list of PuLP decision variables with initial values set.
|
||||
"""
|
||||
for i in range(L*L) :
|
||||
x[i].setInitialValue(0)
|
||||
|
||||
x[1].setInitialValue(1)
|
||||
x[2*L-1].setInitialValue(1)
|
||||
|
||||
return x
|
||||
|
||||
|
||||
def pre_processing(self, L: int, landmarks: list[Landmark], max_time: int, max_landmarks: int | None) :
|
||||
"""
|
||||
Preprocesses the optimization problem by setting up constraints and variables for the tour optimization.
|
||||
@@ -539,6 +564,7 @@ class Optimizer:
|
||||
self.respect_order(prob, x, L) # Respect order of visit (only works when max_time is limiting factor)
|
||||
self.respect_user_must(prob, x, L, landmarks) # Force to do/avoid landmarks set by user.
|
||||
|
||||
# return prob, self.warm_start(x, L)
|
||||
return prob, x
|
||||
|
||||
|
||||
@@ -561,7 +587,10 @@ class Optimizer:
|
||||
prob, x = self.pre_processing(L, landmarks, max_time, max_landmarks)
|
||||
|
||||
# Solve the problem and extract results.
|
||||
prob.solve(pl.PULP_CBC_CMD(msg=False, gapRel=0.1))
|
||||
try :
|
||||
prob.solve(pl.PULP_CBC_CMD(msg=False, timeLimit=self.time_limit+1, gapRel=self.gap_rel))
|
||||
except Exception as exc :
|
||||
raise Exception(f"No solution found: {exc}") from exc
|
||||
status = pl.LpStatus[prob.status]
|
||||
solution = [pl.value(var) for var in x] # The values of the decision variables (will be 0 or 1)
|
||||
|
||||
@@ -576,18 +605,21 @@ class Optimizer:
|
||||
circles = self.is_connected(solution)
|
||||
|
||||
i = 0
|
||||
timeout = 40
|
||||
while circles is not None :
|
||||
i += 1
|
||||
if i == timeout :
|
||||
self.logger.error(f'Timeout: No solution found after {timeout} iterations.')
|
||||
raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.")
|
||||
if i == self.max_iter :
|
||||
self.logger.error(f'Timeout: No solution found after {self.max_iter} iterations.')
|
||||
raise TimeoutError(f"Optimization took too long. No solution found after {self.max_iter} iterations.")
|
||||
|
||||
for circle in circles :
|
||||
self.prevent_circle(prob, x, circle, L)
|
||||
|
||||
# Solve the problem again
|
||||
prob.solve(pl.PULP_CBC_CMD(msg=False))
|
||||
try :
|
||||
prob.solve(pl.PULP_CBC_CMD(msg=False, timeLimit=self.time_limit, gapRel=self.gap_rel))
|
||||
except Exception as exc :
|
||||
raise Exception(f"No solution found: {exc}") from exc
|
||||
|
||||
solution = [pl.value(var) for var in x]
|
||||
|
||||
if pl.LpStatus[prob.status] != 'Optimal' :
|
||||
@@ -602,5 +634,5 @@ class Optimizer:
|
||||
order = self.get_order(solution)
|
||||
tour = [landmarks[i] for i in order]
|
||||
|
||||
self.logger.debug(f"Re-optimized {i} times, objective value : {int(pl.value(prob.objective))}")
|
||||
self.logger.info(f"Re-optimized {i} times, objective value : {int(pl.value(prob.objective))}")
|
||||
return tour
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
"""Module defining the caching strategy for overpass requests."""
|
||||
import os
|
||||
import xml.etree.ElementTree as ET
|
||||
import json
|
||||
import hashlib
|
||||
|
||||
from ..constants import OSM_CACHE_DIR
|
||||
from ..constants import OSM_CACHE_DIR, OSM_TYPES
|
||||
|
||||
|
||||
def get_cache_key(query: str) -> str:
|
||||
@@ -17,10 +16,6 @@ def get_cache_key(query: str) -> str:
|
||||
class CachingStrategyBase:
|
||||
"""
|
||||
Base class for implementing caching strategies.
|
||||
|
||||
This class defines the structure for a caching strategy with basic methods
|
||||
that must be implemented by subclasses. Subclasses should define how to
|
||||
retrieve, store, and close the cache.
|
||||
"""
|
||||
def get(self, key):
|
||||
"""Retrieve the cached data associated with the provided key."""
|
||||
@@ -30,111 +25,108 @@ class CachingStrategyBase:
|
||||
"""Store data in the cache with the specified key."""
|
||||
raise NotImplementedError('Subclass should implement set')
|
||||
|
||||
def set_hollow(self, key, **kwargs):
|
||||
"""Create a hollow (empty) cache entry with a specific key."""
|
||||
raise NotImplementedError('Subclass should implement set_hollow')
|
||||
|
||||
def close(self):
|
||||
"""Clean up or close any resources used by the caching strategy."""
|
||||
|
||||
|
||||
class XMLCache(CachingStrategyBase):
|
||||
class JSONCache(CachingStrategyBase):
|
||||
"""
|
||||
A caching strategy that stores and retrieves data in XML format.
|
||||
|
||||
This class provides methods to cache data as XML files in a specified directory.
|
||||
The directory is automatically suffixed with '_XML' to distinguish it from other
|
||||
caching strategies. The data is stored and retrieved using XML serialization.
|
||||
|
||||
Args:
|
||||
cache_dir (str): The base directory where XML cache files will be stored.
|
||||
Defaults to 'OSM_CACHE_DIR' with a '_XML' suffix.
|
||||
|
||||
Methods:
|
||||
get(key): Retrieve cached data from a XML file associated with the given key.
|
||||
set(key, value): Store data in a XML file with the specified key.
|
||||
A caching strategy that stores and retrieves data in JSON format.
|
||||
"""
|
||||
def __init__(self, cache_dir=OSM_CACHE_DIR):
|
||||
# Add the class name as a suffix to the directory
|
||||
self._cache_dir = f'{cache_dir}_XML'
|
||||
self._cache_dir = f'{cache_dir}'
|
||||
if not os.path.exists(self._cache_dir):
|
||||
os.makedirs(self._cache_dir)
|
||||
|
||||
def _filename(self, key):
|
||||
return os.path.join(self._cache_dir, f'{key}.xml')
|
||||
return os.path.join(self._cache_dir, f'{key}.json')
|
||||
|
||||
def get(self, key):
|
||||
"""Retrieve XML data from the cache and parse it as an ElementTree."""
|
||||
"""Retrieve JSON data from the cache and parse it as an ElementTree."""
|
||||
filename = self._filename(key)
|
||||
if os.path.exists(filename):
|
||||
try:
|
||||
# Parse and return the cached XML data
|
||||
tree = ET.parse(filename)
|
||||
return tree.getroot() # Return the root element of the parsed XML
|
||||
except ET.ParseError:
|
||||
# print(f"Error parsing cached XML file: {filename}")
|
||||
return None
|
||||
# Open and parse the cached JSON data
|
||||
with open(filename, 'r', encoding='utf-8') as file:
|
||||
data = json.load(file)
|
||||
# Return the data as a list of dicts.
|
||||
return data
|
||||
except json.JSONDecodeError:
|
||||
return None # Return None if parsing fails
|
||||
return None
|
||||
|
||||
def set(self, key, value):
|
||||
"""Save the XML data as an ElementTree to the cache."""
|
||||
"""Save the JSON data as an ElementTree to the cache."""
|
||||
filename = self._filename(key)
|
||||
tree = ET.ElementTree(value) # value is expected to be an ElementTree root element
|
||||
try:
|
||||
# Write the XML data to a file
|
||||
with open(filename, 'wb') as file:
|
||||
tree.write(file, encoding='utf-8', xml_declaration=True)
|
||||
# Write the JSON data to the cache file
|
||||
with open(filename, 'w', encoding='utf-8') as file:
|
||||
json.dump(value, file, ensure_ascii=False, indent=4)
|
||||
except IOError as e:
|
||||
raise IOError(f"Error writing to cache file: {filename} - {e}") from e
|
||||
|
||||
def set_hollow(self, key, cell: tuple, osm_types: list,
|
||||
selector: str, conditions: list=None, out='center'):
|
||||
"""Create an empty placeholder cache entry for a future fill."""
|
||||
hollow_key = f'hollow_{key}'
|
||||
filename = self._filename(hollow_key)
|
||||
|
||||
# Create the hollow JSON structure
|
||||
hollow_data = {
|
||||
"key": key,
|
||||
"cell": list(cell),
|
||||
"osm_types": list(osm_types),
|
||||
"selector": selector,
|
||||
"conditions": conditions,
|
||||
"out": out
|
||||
}
|
||||
# Write the hollow data to the cache file
|
||||
try:
|
||||
with open(filename, 'w', encoding='utf-8') as file:
|
||||
json.dump(hollow_data, file, ensure_ascii=False, indent=4)
|
||||
except IOError as e:
|
||||
raise IOError(f"Error writing hollow cache to file: {filename} - {e}") from e
|
||||
|
||||
def close(self):
|
||||
"""Cleanup method, if needed."""
|
||||
pass
|
||||
|
||||
class CachingStrategy:
|
||||
"""
|
||||
A class to manage different caching strategies.
|
||||
|
||||
This class provides an interface to switch between different caching strategies
|
||||
(e.g., XMLCache, JSONCache) dynamically. It allows caching data in different formats,
|
||||
depending on the strategy being used. By default, it uses the XMLCache strategy.
|
||||
|
||||
Attributes:
|
||||
__strategy (CachingStrategyBase): The currently active caching strategy.
|
||||
__strategies (dict): A mapping between strategy names (as strings) and their corresponding
|
||||
classes, allowing dynamic selection of caching strategies.
|
||||
"""
|
||||
__strategy = XMLCache() # Default caching strategy
|
||||
__strategy = JSONCache() # Default caching strategy
|
||||
__strategies = {
|
||||
'XML': XMLCache,
|
||||
'JSON': JSONCache,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def use(cls, strategy_name='XML', **kwargs):
|
||||
"""
|
||||
Set the caching strategy based on the strategy_name provided.
|
||||
|
||||
Args:
|
||||
strategy_name (str): The name of the caching strategy (e.g., 'XML').
|
||||
**kwargs: Additional keyword arguments to pass when initializing the strategy.
|
||||
"""
|
||||
# If a previous strategy exists, close it
|
||||
def use(cls, strategy_name='JSON', **kwargs):
|
||||
if cls.__strategy:
|
||||
cls.__strategy.close()
|
||||
|
||||
# Retrieve the strategy class based on the strategy name
|
||||
strategy_class = cls.__strategies.get(strategy_name)
|
||||
|
||||
if not strategy_class:
|
||||
raise ValueError(f"Unknown caching strategy: {strategy_name}")
|
||||
|
||||
# Instantiate the new strategy with the provided arguments
|
||||
cls.__strategy = strategy_class(**kwargs)
|
||||
return cls.__strategy
|
||||
|
||||
@classmethod
|
||||
def get(cls, key):
|
||||
"""Get data from the current strategy's cache."""
|
||||
if not cls.__strategy:
|
||||
raise RuntimeError("Caching strategy has not been set.")
|
||||
return cls.__strategy.get(key)
|
||||
|
||||
@classmethod
|
||||
def set(cls, key, value):
|
||||
"""Set data in the current strategy's cache."""
|
||||
if not cls.__strategy:
|
||||
raise RuntimeError("Caching strategy has not been set.")
|
||||
cls.__strategy.set(key, value)
|
||||
|
||||
@classmethod
|
||||
def set_hollow(cls, key, cell: tuple, osm_types: OSM_TYPES,
|
||||
selector: str, conditions: list=None, out='center'):
|
||||
"""Create a hollow cache entry."""
|
||||
cls.__strategy.set_hollow(key, cell, osm_types, selector, conditions, out)
|
||||
|
||||
@@ -1,14 +1,17 @@
|
||||
"""Module allowing connexion to overpass api and fectch data from OSM."""
|
||||
from typing import Literal, List
|
||||
import os
|
||||
import urllib
|
||||
import math
|
||||
import logging
|
||||
import xml.etree.ElementTree as ET
|
||||
import json
|
||||
from typing import List, Tuple
|
||||
|
||||
from .caching_strategy import get_cache_key, CachingStrategy
|
||||
from ..constants import OSM_CACHE_DIR
|
||||
from ..constants import OSM_CACHE_DIR, OSM_TYPES, BBOX
|
||||
|
||||
logger = logging.getLogger('Overpass')
|
||||
osm_types = List[Literal['way', 'node', 'relation']]
|
||||
|
||||
RESOLUTION = 0.05
|
||||
CELL = Tuple[int, int]
|
||||
|
||||
|
||||
class Overpass :
|
||||
@@ -16,7 +19,10 @@ class Overpass :
|
||||
Overpass class to manage the query building and sending to overpass api.
|
||||
The caching strategy is a part of this class and initialized upon creation of the Overpass object.
|
||||
"""
|
||||
def __init__(self, caching_strategy: str = 'XML', cache_dir: str = OSM_CACHE_DIR) :
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def __init__(self, caching_strategy: str = 'JSON', cache_dir: str = OSM_CACHE_DIR) :
|
||||
"""
|
||||
Initialize the Overpass instance with the url, headers and caching strategy.
|
||||
"""
|
||||
@@ -25,17 +31,109 @@ class Overpass :
|
||||
self.caching_strategy = CachingStrategy.use(caching_strategy, cache_dir=cache_dir)
|
||||
|
||||
|
||||
@classmethod
|
||||
def build_query(self, area: tuple, osm_types: osm_types,
|
||||
selector: str, conditions=[], out='center') -> str:
|
||||
def send_query(self, bbox: BBOX, osm_types: OSM_TYPES,
|
||||
selector: str, conditions: list=None, out='center') -> List[dict]:
|
||||
"""
|
||||
Sends the Overpass QL query to the Overpass API and returns the parsed json response.
|
||||
|
||||
Args:
|
||||
bbox (tuple): Bounding box for the query.
|
||||
osm_types (list[str]): List of OSM element types (e.g., 'node', 'way').
|
||||
selector (str): Key or tag to filter OSM elements (e.g., 'highway').
|
||||
conditions (list): Optional list of additional filter conditions in Overpass QL format.
|
||||
out (str): Output format ('center', 'body', etc.). Defaults to 'center'.
|
||||
|
||||
Returns:
|
||||
list: Parsed json response from the Overpass API, or cached data if available.
|
||||
"""
|
||||
# Determine which grid cells overlap with this bounding box.
|
||||
overlapping_cells = Overpass._get_overlapping_cells(bbox)
|
||||
|
||||
# Retrieve cached data and identify missing cache entries
|
||||
cached_responses, non_cached_cells = self._retrieve_cached_data(overlapping_cells, osm_types, selector, conditions, out)
|
||||
|
||||
self.logger.info(f'Cache hit for {len(overlapping_cells)-len(non_cached_cells)}/{len(overlapping_cells)} quadrants.')
|
||||
|
||||
# If there is no missing data, return the cached responses after filtering.
|
||||
if not non_cached_cells :
|
||||
return Overpass._filter_landmarks(cached_responses, bbox)
|
||||
|
||||
# If there is no cached data, fetch all from Overpass.
|
||||
elif not cached_responses :
|
||||
query_str = Overpass.build_query(bbox, osm_types, selector, conditions, out)
|
||||
return self.fetch_data_from_api(query_str)
|
||||
|
||||
# Hybrid cache: some data from Overpass, some data from cache.
|
||||
else :
|
||||
# Resize the bbox for smaller search area and build new query string.
|
||||
non_cached_bbox = Overpass._get_non_cached_bbox(non_cached_cells, bbox)
|
||||
query_str = Overpass.build_query(non_cached_bbox, osm_types, selector, conditions, out)
|
||||
non_cached_responses = self.fetch_data_from_api(query_str)
|
||||
return Overpass._filter_landmarks(cached_responses, bbox) + non_cached_responses
|
||||
|
||||
|
||||
def fetch_data_from_api(self, query_str: str) -> List[dict]:
|
||||
"""
|
||||
Fetch data from the Overpass API and return the json data.
|
||||
|
||||
Args:
|
||||
query_str (str): The Overpass query string.
|
||||
|
||||
Returns:
|
||||
dict: Combined cached and fetched data.
|
||||
"""
|
||||
try:
|
||||
data = urllib.parse.urlencode({'data': query_str}).encode('utf-8')
|
||||
request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers)
|
||||
|
||||
with urllib.request.urlopen(request) as response:
|
||||
response_data = response.read().decode('utf-8') # Convert the HTTPResponse to a string
|
||||
data = json.loads(response_data) # Load the JSON from the string
|
||||
elements = data.get('elements', [])
|
||||
# self.logger.debug(f'Query = {query_str}')
|
||||
return elements
|
||||
|
||||
except urllib.error.URLError as e:
|
||||
self.logger.error(f"Error connecting to Overpass API: {e}")
|
||||
raise ConnectionError(f"Error connecting to Overpass API: {e}") from e
|
||||
except Exception as exc :
|
||||
raise Exception(f'An unexpected error occured: {str(exc)}') from exc
|
||||
|
||||
|
||||
def fill_cache(self, json_data: dict) :
|
||||
"""
|
||||
Fill cache with data by using a hollow cache entry's information.
|
||||
"""
|
||||
query_str, cache_key = Overpass._build_query_from_hollow(json_data)
|
||||
try:
|
||||
data = urllib.parse.urlencode({'data': query_str}).encode('utf-8')
|
||||
request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers)
|
||||
|
||||
with urllib.request.urlopen(request) as response:
|
||||
|
||||
# Convert the HTTPResponse to a string and load data
|
||||
response_data = response.read().decode('utf-8')
|
||||
data = json.loads(response_data)
|
||||
|
||||
# Get elements and set cache
|
||||
elements = data.get('elements', [])
|
||||
self.caching_strategy.set(cache_key, elements)
|
||||
self.logger.debug(f'Cache set for {cache_key}')
|
||||
except urllib.error.URLError as e:
|
||||
raise ConnectionError(f"Error connecting to Overpass API: {e}") from e
|
||||
except Exception as exc :
|
||||
raise Exception(f'An unexpected error occured: {str(exc)}') from exc
|
||||
|
||||
|
||||
@staticmethod
|
||||
def build_query(bbox: BBOX, osm_types: OSM_TYPES,
|
||||
selector: str, conditions: list=None, out='center') -> str:
|
||||
"""
|
||||
Constructs a query string for the Overpass API to retrieve OpenStreetMap (OSM) data.
|
||||
|
||||
Args:
|
||||
area (tuple): A tuple representing the geographical search area, typically in the format
|
||||
(radius, latitude, longitude). The first element is a string like "around:2000"
|
||||
specifying the search radius, and the second and third elements represent
|
||||
the latitude and longitude as floats or strings.
|
||||
bbox (tuple): A tuple representing the geographical search area, typically in the format
|
||||
(lat_min, lon_min, lat_max, lon_max).
|
||||
osm_types (list[str]): A list of OSM element types to search for. Must be one or more of
|
||||
'Way', 'Node', or 'Relation'.
|
||||
selector (str): The key or tag to filter the OSM elements (e.g., 'amenity', 'highway', etc.).
|
||||
@@ -52,82 +150,203 @@ class Overpass :
|
||||
Notes:
|
||||
- If no conditions are provided, the query will just use the `selector` to filter the OSM
|
||||
elements without additional constraints.
|
||||
- The search area must always formatted as "(radius, lat, lon)".
|
||||
"""
|
||||
if not isinstance(conditions, list) :
|
||||
conditions = [conditions]
|
||||
if not isinstance(osm_types, list) :
|
||||
osm_types = [osm_types]
|
||||
query = '[out:json];('
|
||||
|
||||
query = '('
|
||||
# convert the bbox to string.
|
||||
bbox_str = f"({','.join(map(str, bbox))})"
|
||||
|
||||
# Round the radius to nearest 50 and coordinates to generate less queries
|
||||
if area[0] > 500 :
|
||||
search_radius = round(area[0] / 50) * 50
|
||||
loc = tuple((round(area[1], 2), round(area[2], 2)))
|
||||
else :
|
||||
search_radius = round(area[0] / 25) * 25
|
||||
loc = tuple((round(area[1], 3), round(area[2], 3)))
|
||||
|
||||
search_area = f"(around:{search_radius}, {str(loc[0])}, {str(loc[1])})"
|
||||
|
||||
if conditions :
|
||||
if conditions is not None and len(conditions) > 0:
|
||||
conditions = '(if: ' + ' && '.join(conditions) + ')'
|
||||
else :
|
||||
conditions = ''
|
||||
|
||||
for elem in osm_types :
|
||||
query += elem + '[' + selector + ']' + conditions + search_area + ';'
|
||||
query += elem + '[' + selector + ']' + conditions + bbox_str + ';'
|
||||
|
||||
query += ');' + f'out {out};'
|
||||
|
||||
return query
|
||||
|
||||
|
||||
def send_query(self, query: str) -> ET:
|
||||
def _retrieve_cached_data(self, overlapping_cells: CELL, osm_types: OSM_TYPES,
|
||||
selector: str, conditions: list, out: str) -> Tuple[List[dict], list[CELL]]:
|
||||
"""
|
||||
Sends the Overpass QL query to the Overpass API and returns the parsed JSON response.
|
||||
Retrieve cached data and identify missing cache quadrants.
|
||||
|
||||
Args:
|
||||
query (str): The Overpass QL query to be sent to the Overpass API.
|
||||
overlapping_cells (list): Cells to check for cached data.
|
||||
osm_types (list): OSM types (e.g., 'node', 'way').
|
||||
selector (str): Key or tag to filter OSM elements.
|
||||
conditions (list): Additional conditions to apply.
|
||||
out (str): Output format.
|
||||
|
||||
Returns:
|
||||
dict: The parsed JSON response from the Overpass API, or None if the request fails.
|
||||
tuple: A tuple containing:
|
||||
- cached_responses (list): List of cached data found.
|
||||
- non_cached_cells (list(tuple)): List of cells with missing data.
|
||||
"""
|
||||
cell_key_dict = {}
|
||||
for cell in overlapping_cells :
|
||||
for elem in osm_types :
|
||||
key_str = f"{elem}[{selector}]{conditions}({','.join(map(str, cell))})"
|
||||
|
||||
cell_key_dict[cell] = get_cache_key(key_str)
|
||||
|
||||
cached_responses = []
|
||||
non_cached_cells = []
|
||||
|
||||
# Retrieve the cached data and mark the missing entries as hollow
|
||||
for cell, key in cell_key_dict.items():
|
||||
cached_data = self.caching_strategy.get(key)
|
||||
if cached_data is not None :
|
||||
cached_responses += cached_data
|
||||
else:
|
||||
self.caching_strategy.set_hollow(key, cell, osm_types, selector, conditions, out)
|
||||
non_cached_cells.append(cell)
|
||||
|
||||
return cached_responses, non_cached_cells
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _build_query_from_hollow(json_data: dict) -> Tuple[str, str]:
|
||||
"""
|
||||
Build query string using information from a hollow cache entry.
|
||||
"""
|
||||
# Extract values from the JSON object
|
||||
key = json_data.get('key')
|
||||
cell = tuple(json_data.get('cell'))
|
||||
bbox = Overpass._get_bbox_from_grid_cell(cell)
|
||||
osm_types = json_data.get('osm_types')
|
||||
selector = json_data.get('selector')
|
||||
conditions = json_data.get('conditions')
|
||||
out = json_data.get('out')
|
||||
|
||||
|
||||
query_str = Overpass.build_query(bbox, osm_types, selector, conditions, out)
|
||||
return query_str, key
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _get_overlapping_cells(query_bbox: tuple) -> List[CELL]:
|
||||
"""
|
||||
Returns a set of all grid cells that overlap with the given bounding box.
|
||||
"""
|
||||
# Extract location from the query bbox
|
||||
lat_min, lon_min, lat_max, lon_max = query_bbox
|
||||
|
||||
min_lat_cell, min_lon_cell = Overpass._get_grid_cell(lat_min, lon_min)
|
||||
max_lat_cell, max_lon_cell = Overpass._get_grid_cell(lat_max, lon_max)
|
||||
|
||||
overlapping_cells = set()
|
||||
for lat_idx in range(min_lat_cell, max_lat_cell + 1):
|
||||
for lon_idx in range(min_lon_cell, max_lon_cell + 1):
|
||||
overlapping_cells.add((lat_idx, lon_idx))
|
||||
|
||||
return overlapping_cells
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _get_grid_cell(lat: float, lon: float) -> CELL:
|
||||
"""
|
||||
Returns the grid cell coordinates for a given latitude and longitude.
|
||||
Each grid cell is 0.05°lat x 0.05°lon resolution in size.
|
||||
"""
|
||||
lat_index = math.floor(lat / RESOLUTION)
|
||||
lon_index = math.floor(lon / RESOLUTION)
|
||||
return (lat_index, lon_index)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _get_bbox_from_grid_cell(cell: CELL) -> BBOX:
|
||||
"""
|
||||
Returns the bounding box for a given grid cell index.
|
||||
Each grid cell is resolution x resolution in size.
|
||||
|
||||
The bounding box is returned as (min_lat, min_lon, max_lat, max_lon).
|
||||
"""
|
||||
# Calculate the southwest (min_lat, min_lon) corner of the bounding box
|
||||
min_lat = round(cell[0] * RESOLUTION, 2)
|
||||
min_lon = round(cell[1] * RESOLUTION, 2)
|
||||
|
||||
# Calculate the northeast (max_lat, max_lon) corner of the bounding box
|
||||
max_lat = round((cell[0] + 1) * RESOLUTION, 2)
|
||||
max_lon = round((cell[1] + 1) * RESOLUTION, 2)
|
||||
|
||||
return (min_lat, min_lon, max_lat, max_lon)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _get_non_cached_bbox(non_cached_cells: List[CELL], original_bbox: BBOX):
|
||||
"""
|
||||
Calculate the non-cached bounding box by excluding cached cells.
|
||||
|
||||
Args:
|
||||
non_cached_cells (list): The list of cells that were not found in the cache.
|
||||
original_bbox (tuple): The original bounding box (min_lat, min_lon, max_lat, max_lon).
|
||||
|
||||
Returns:
|
||||
tuple: The new bounding box that excludes cached cells, or None if all cells are cached.
|
||||
"""
|
||||
if not non_cached_cells:
|
||||
return None # All cells were cached
|
||||
|
||||
# Initialize the non-cached bounding box with extreme values
|
||||
min_lat, min_lon, max_lat, max_lon = float('inf'), float('inf'), float('-inf'), float('-inf')
|
||||
|
||||
# Iterate over non-cached cells to find the new bounding box
|
||||
for cell in non_cached_cells:
|
||||
cell_min_lat, cell_min_lon, cell_max_lat, cell_max_lon = Overpass._get_bbox_from_grid_cell(cell)
|
||||
|
||||
min_lat = min(min_lat, cell_min_lat)
|
||||
min_lon = min(min_lon, cell_min_lon)
|
||||
max_lat = max(max_lat, cell_max_lat)
|
||||
max_lon = max(max_lon, cell_max_lon)
|
||||
|
||||
# If no update to bounding box, return the original
|
||||
if min_lat == float('inf') or min_lon == float('inf'):
|
||||
return None
|
||||
|
||||
return (max(min_lat, original_bbox[0]),
|
||||
max(min_lon, original_bbox[1]),
|
||||
min(max_lat, original_bbox[2]),
|
||||
min(max_lon, original_bbox[3]))
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _filter_landmarks(elements: List[dict], bbox: BBOX) -> List[dict]:
|
||||
"""
|
||||
Filters elements based on whether their coordinates are inside the given bbox.
|
||||
|
||||
Args:
|
||||
- elements (list of dict): List of elements containing coordinates.
|
||||
- bbox (tuple): A bounding box defined as (min_lat, min_lon, max_lat, max_lon).
|
||||
|
||||
Returns:
|
||||
- list: A list of elements whose coordinates are inside the bounding box.
|
||||
"""
|
||||
|
||||
# Generate a cache key for the current query
|
||||
cache_key = get_cache_key(query)
|
||||
filtered_elements = []
|
||||
min_lat, min_lon, max_lat, max_lon = bbox
|
||||
|
||||
# Try to fetch the result from the cache
|
||||
cached_response = self.caching_strategy.get(cache_key)
|
||||
if cached_response is not None :
|
||||
logger.debug("Cache hit.")
|
||||
return cached_response
|
||||
for elem in elements:
|
||||
# Extract coordinates based on the 'type' of element
|
||||
if elem.get('type') != 'node':
|
||||
center = elem.get('center', {})
|
||||
lat = float(center.get('lat', 0))
|
||||
lon = float(center.get('lon', 0))
|
||||
else:
|
||||
lat = float(elem.get('lat', 0))
|
||||
lon = float(elem.get('lon', 0))
|
||||
|
||||
# Prepare the data to be sent as POST request, encoded as bytes
|
||||
data = urllib.parse.urlencode({'data': query}).encode('utf-8')
|
||||
# Check if the coordinates fall within the given bounding box
|
||||
if min_lat <= lat <= max_lat and min_lon <= lon <= max_lon:
|
||||
filtered_elements.append(elem)
|
||||
|
||||
try:
|
||||
# Create a Request object with the specified URL, data, and headers
|
||||
request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers)
|
||||
|
||||
# Send the request and read the response
|
||||
with urllib.request.urlopen(request) as response:
|
||||
# Read and decode the response
|
||||
response_data = response.read().decode('utf-8')
|
||||
root = ET.fromstring(response_data)
|
||||
|
||||
# Cache the response data as an ElementTree root
|
||||
self.caching_strategy.set(cache_key, root)
|
||||
logger.debug("Response data added to cache.")
|
||||
|
||||
return root
|
||||
|
||||
except urllib.error.URLError as e:
|
||||
raise ConnectionError(f"Error connecting to Overpass API: {e}") from e
|
||||
return filtered_elements
|
||||
|
||||
|
||||
def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) :
|
||||
def get_base_info(elem: dict, osm_type: OSM_TYPES, with_name=False) :
|
||||
"""
|
||||
Extracts base information (coordinates, OSM ID, and optionally a name) from an OSM element.
|
||||
|
||||
@@ -136,7 +355,7 @@ def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) :
|
||||
extracting coordinates either directly or from a center tag, depending on the element type.
|
||||
|
||||
Args:
|
||||
elem (ET.Element): The XML element representing the OSM entity.
|
||||
elem (dict): The JSON element representing the OSM entity.
|
||||
osm_type (str): The type of the OSM entity (e.g., 'node', 'way'). If 'node', the coordinates
|
||||
are extracted directly from the element; otherwise, from the 'center' tag.
|
||||
with_name (bool): Whether to extract and return the name of the element. If True, it attempts
|
||||
@@ -150,7 +369,7 @@ def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) :
|
||||
"""
|
||||
# 1. extract coordinates
|
||||
if osm_type != 'node' :
|
||||
center = elem.find('center')
|
||||
center = elem.get('center')
|
||||
lat = float(center.get('lat'))
|
||||
lon = float(center.get('lon'))
|
||||
|
||||
@@ -165,7 +384,31 @@ def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) :
|
||||
|
||||
# 3. Extract name if specified and return
|
||||
if with_name :
|
||||
name = elem.find("tag[@k='name']").get('v') if elem.find("tag[@k='name']") is not None else None
|
||||
name = elem.get('tags', {}).get('name')
|
||||
return osm_id, coords, name
|
||||
else :
|
||||
return osm_id, coords
|
||||
|
||||
|
||||
def fill_cache():
|
||||
"""
|
||||
Scans the specified cache directory for files starting with 'hollow_' and attempts to load
|
||||
their contents as JSON to fill the cache of the Overpass system.
|
||||
"""
|
||||
overpass = Overpass()
|
||||
|
||||
with os.scandir(OSM_CACHE_DIR) as it:
|
||||
for entry in it:
|
||||
if entry.is_file() and entry.name.startswith('hollow_'):
|
||||
|
||||
try :
|
||||
# Read the whole file content as a string
|
||||
with open(entry.path, 'r') as f:
|
||||
# load data and fill the cache with the query and key
|
||||
json_data = json.load(f)
|
||||
overpass.fill_cache(json_data)
|
||||
# Now delete the file as the cache is filled
|
||||
os.remove(entry.path)
|
||||
|
||||
except Exception as exc :
|
||||
overpass.logger.error(f'An error occured while parsing file {entry.path} as .json file')
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
city_bbox_side: 7500 #m
|
||||
max_bbox_side: 4000 #m
|
||||
radius_close_to: 50
|
||||
church_coeff: 0.65
|
||||
nature_coeff: 1.35
|
||||
church_coeff: 0.55
|
||||
nature_coeff: 1.4
|
||||
overall_coeff: 10
|
||||
tag_exponent: 1.15
|
||||
image_bonus: 1.1
|
||||
viewpoint_bonus: 5
|
||||
wikipedia_bonus: 1.1
|
||||
wikipedia_bonus: 1.25
|
||||
name_bonus: 3
|
||||
N_important: 40
|
||||
N_important: 60
|
||||
pay_bonus: -1
|
||||
|
||||
@@ -3,4 +3,7 @@ detour_corridor_width: 300
|
||||
average_walking_speed: 4.8
|
||||
max_landmarks: 10
|
||||
max_landmarks_refiner: 20
|
||||
overshoot: 1.1
|
||||
overshoot: 0.0016
|
||||
time_limit: 1
|
||||
gap_rel: 0.05
|
||||
max_iter: 40
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
from typing import Optional, Literal
|
||||
from uuid import uuid4, UUID
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
|
||||
# Output to frontend
|
||||
@@ -51,7 +51,7 @@ class Landmark(BaseModel) :
|
||||
website_url : Optional[str] = None
|
||||
wiki_url : Optional[str] = None
|
||||
description : Optional[str] = None # TODO future
|
||||
duration : Optional[int] = 0
|
||||
duration : Optional[int] = 5
|
||||
name_en : Optional[str] = None
|
||||
|
||||
# Unique ID of a given landmark
|
||||
@@ -144,8 +144,4 @@ class Toilets(BaseModel) :
|
||||
"""
|
||||
return f'Toilets @{self.location}'
|
||||
|
||||
class Config:
|
||||
"""
|
||||
This allows us to easily convert the model to and from dictionaries
|
||||
"""
|
||||
from_attributes = True
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
|
||||
@@ -27,11 +27,13 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
|
||||
"/trip/new",
|
||||
json={
|
||||
"preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
|
||||
"nature": {"type": "nature", "score": 5},
|
||||
"shopping": {"type": "shopping", "score": 5},
|
||||
"nature": {"type": "nature", "score": 0},
|
||||
"shopping": {"type": "shopping", "score": 0},
|
||||
"max_time_minute": duration_minutes,
|
||||
"detour_tolerance_minute": 0},
|
||||
"start": [48.084588, 7.280405]
|
||||
# "start": [48.084588, 7.280405]
|
||||
# "start": [45.74445023349939, 4.8222687890538865]
|
||||
"start": [45.75156398104873, 4.827154464827647]
|
||||
}
|
||||
)
|
||||
result = response.json()
|
||||
@@ -50,13 +52,12 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
|
||||
# checks :
|
||||
assert response.status_code == 200 # check for successful planning
|
||||
assert isinstance(landmarks, list) # check that the return type is a list
|
||||
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2, f"Trip duration not within 20\% of desired length"
|
||||
assert len(landmarks) > 2 # check that there is something to visit
|
||||
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
|
||||
assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}"
|
||||
assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}"
|
||||
# assert 2!= 3
|
||||
|
||||
|
||||
def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
|
||||
"""
|
||||
Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
|
||||
@@ -98,10 +99,9 @@ def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
|
||||
assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}"
|
||||
assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}"
|
||||
|
||||
|
||||
def test_cologne(client, request) : # pylint: disable=redefined-outer-name
|
||||
"""
|
||||
Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
|
||||
Test n°3 : Custom test in Cologne to ensure proper decision making in crowded area.
|
||||
|
||||
Args:
|
||||
client:
|
||||
@@ -142,7 +142,7 @@ def test_cologne(client, request) : # pylint: disable=redefined-outer-name
|
||||
|
||||
def test_strasbourg(client, request) : # pylint: disable=redefined-outer-name
|
||||
"""
|
||||
Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
|
||||
Test n°4 : Custom test in Strasbourg to ensure proper decision making in crowded area.
|
||||
|
||||
Args:
|
||||
client:
|
||||
@@ -183,7 +183,7 @@ def test_strasbourg(client, request) : # pylint: disable=redefined-outer-name
|
||||
|
||||
def test_zurich(client, request) : # pylint: disable=redefined-outer-name
|
||||
"""
|
||||
Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
|
||||
Test n°5 : Custom test in Zurich to ensure proper decision making in crowded area.
|
||||
|
||||
Args:
|
||||
client:
|
||||
@@ -224,24 +224,24 @@ def test_zurich(client, request) : # pylint: disable=redefined-outer-name
|
||||
|
||||
def test_paris(client, request) : # pylint: disable=redefined-outer-name
|
||||
"""
|
||||
Test n°2 : Custom test in Paris (les Halles) centre to ensure proper decision making in crowded area.
|
||||
Test n°6 : Custom test in Paris (les Halles) centre to ensure proper decision making in crowded area.
|
||||
|
||||
Args:
|
||||
client:
|
||||
request:
|
||||
"""
|
||||
start_time = time.time() # Start timer
|
||||
duration_minutes = 300
|
||||
duration_minutes = 200
|
||||
|
||||
response = client.post(
|
||||
"/trip/new",
|
||||
json={
|
||||
"preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
|
||||
"nature": {"type": "nature", "score": 5},
|
||||
"nature": {"type": "nature", "score": 0},
|
||||
"shopping": {"type": "shopping", "score": 5},
|
||||
"max_time_minute": duration_minutes,
|
||||
"detour_tolerance_minute": 0},
|
||||
"start": [48.86248803298562, 2.346451131285925]
|
||||
"start": [48.85468881798671, 2.3423925755998374]
|
||||
}
|
||||
)
|
||||
result = response.json()
|
||||
@@ -265,7 +265,7 @@ def test_paris(client, request) : # pylint: disable=redefined-outer-name
|
||||
|
||||
def test_new_york(client, request) : # pylint: disable=redefined-outer-name
|
||||
"""
|
||||
Test n°2 : Custom test in New York (les Halles) centre to ensure proper decision making in crowded area.
|
||||
Test n°7 : Custom test in New York to ensure proper decision making in crowded area.
|
||||
|
||||
Args:
|
||||
client:
|
||||
@@ -306,7 +306,7 @@ def test_new_york(client, request) : # pylint: disable=redefined-outer-name
|
||||
|
||||
def test_shopping(client, request) : # pylint: disable=redefined-outer-name
|
||||
"""
|
||||
Test n°3 : Custom test in Lyon centre to ensure shopping clusters are found.
|
||||
Test n°8 : Custom test in Lyon centre to ensure shopping clusters are found.
|
||||
|
||||
Args:
|
||||
client:
|
||||
@@ -335,11 +335,11 @@ def test_shopping(client, request) : # pylint: disable=redefined-outer-name
|
||||
# Add details to report
|
||||
log_trip_details(request, landmarks, result['total_time'], duration_minutes)
|
||||
|
||||
for elem in landmarks :
|
||||
print(elem)
|
||||
# for elem in landmarks :
|
||||
# print(elem)
|
||||
|
||||
# checks :
|
||||
assert response.status_code == 200 # check for successful planning
|
||||
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
|
||||
assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}"
|
||||
assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}"
|
||||
assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}"
|
||||
@@ -9,7 +9,8 @@ from pydantic import BaseModel
|
||||
from ..overpass.overpass import Overpass, get_base_info
|
||||
from ..structs.landmark import Landmark
|
||||
from .get_time_distance import get_distance
|
||||
from ..constants import OSM_CACHE_DIR
|
||||
from .utils import create_bbox
|
||||
|
||||
|
||||
|
||||
# silence the overpass logger
|
||||
@@ -79,8 +80,7 @@ class ClusterManager:
|
||||
bbox: The bounding box coordinates (around:radius, center_lat, center_lon).
|
||||
"""
|
||||
# Setup the caching in the Overpass class.
|
||||
self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR)
|
||||
|
||||
self.overpass = Overpass()
|
||||
|
||||
self.cluster_type = cluster_type
|
||||
if cluster_type == 'shopping' :
|
||||
@@ -95,32 +95,29 @@ class ClusterManager:
|
||||
raise NotImplementedError("Please choose only an available option for cluster detection")
|
||||
|
||||
# Initialize the points for cluster detection
|
||||
query = self.overpass.build_query(
|
||||
area = bbox,
|
||||
try:
|
||||
result = self.overpass.send_query(
|
||||
bbox = bbox,
|
||||
osm_types = osm_types,
|
||||
selector = sel,
|
||||
out = out
|
||||
)
|
||||
self.logger.debug(f"Cluster query: {query}")
|
||||
|
||||
try:
|
||||
result = self.overpass.send_query(query)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error fetching landmarks: {e}")
|
||||
self.logger.error(f"Error fetching clusters: {e}")
|
||||
|
||||
if result is None :
|
||||
self.logger.error(f"Error fetching {cluster_type} clusters, overpass query returned None.")
|
||||
self.logger.debug(f"Found no {cluster_type} clusters, overpass query returned no datapoints.")
|
||||
self.valid = False
|
||||
|
||||
else :
|
||||
points = []
|
||||
for osm_type in osm_types :
|
||||
for elem in result.findall(osm_type):
|
||||
|
||||
# Get coordinates and append them to the points list
|
||||
_, coords = get_base_info(elem, osm_type)
|
||||
if coords is not None :
|
||||
points.append(coords)
|
||||
for elem in result:
|
||||
osm_type = elem.get('type')
|
||||
|
||||
# Get coordinates and append them to the points list
|
||||
_, coords = get_base_info(elem, osm_type)
|
||||
if coords is not None :
|
||||
points.append(coords)
|
||||
|
||||
if points :
|
||||
self.all_points = np.array(points)
|
||||
@@ -137,7 +134,7 @@ class ClusterManager:
|
||||
|
||||
# Check that there are is least 1 cluster
|
||||
if len(set(labels)) > 1 :
|
||||
self.logger.debug(f"Found {len(set(labels))} different clusters.")
|
||||
self.logger.info(f"Found {len(set(labels))} different {cluster_type} clusters.")
|
||||
# Separate clustered points and noise points
|
||||
self.cluster_points = self.all_points[labels != -1]
|
||||
self.cluster_labels = labels[labels != -1]
|
||||
@@ -145,7 +142,7 @@ class ClusterManager:
|
||||
self.valid = True
|
||||
|
||||
else :
|
||||
self.logger.debug(f"Detected 0 {cluster_type} clusters.")
|
||||
self.logger.info(f"Found 0 {cluster_type} clusters.")
|
||||
self.valid = False
|
||||
|
||||
else :
|
||||
@@ -218,9 +215,8 @@ class ClusterManager:
|
||||
"""
|
||||
|
||||
# Define the bounding box for a given radius around the coordinates
|
||||
lat, lon = cluster.centroid
|
||||
bbox = (1000, lat, lon)
|
||||
|
||||
bbox = create_bbox(cluster.centroid, 1000)
|
||||
|
||||
# Query neighborhoods and shopping malls
|
||||
selectors = ['"place"~"^(suburb|neighborhood|neighbourhood|quarter|city_block)$"']
|
||||
|
||||
@@ -238,37 +234,34 @@ class ClusterManager:
|
||||
osm_types = ['node', 'way', 'relation']
|
||||
|
||||
for sel in selectors :
|
||||
query = self.overpass.build_query(
|
||||
area = bbox,
|
||||
osm_types = osm_types,
|
||||
selector = sel,
|
||||
out = 'ids center'
|
||||
)
|
||||
|
||||
try:
|
||||
result = self.overpass.send_query(query)
|
||||
result = self.overpass.send_query(bbox = bbox,
|
||||
osm_types = osm_types,
|
||||
selector = sel,
|
||||
out = 'ids center'
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error fetching landmarks: {e}")
|
||||
self.logger.error(f"Error fetching clusters: {e}")
|
||||
continue
|
||||
|
||||
if result is None :
|
||||
self.logger.error(f"Error fetching landmarks: {e}")
|
||||
self.logger.error(f"Error fetching clusters: {e}")
|
||||
continue
|
||||
|
||||
for osm_type in osm_types :
|
||||
for elem in result.findall(osm_type):
|
||||
for elem in result:
|
||||
osm_type = elem.get('type')
|
||||
|
||||
id, coords, name = get_base_info(elem, osm_type, with_name=True)
|
||||
id, coords, name = get_base_info(elem, osm_type, with_name=True)
|
||||
|
||||
if name is None or coords is None :
|
||||
continue
|
||||
if name is None or coords is None :
|
||||
continue
|
||||
|
||||
d = get_distance(cluster.centroid, coords)
|
||||
if d < min_dist :
|
||||
min_dist = d
|
||||
new_name = name
|
||||
osm_type = osm_type # Add type: 'way' or 'relation'
|
||||
osm_id = id # Add OSM id
|
||||
d = get_distance(cluster.centroid, coords)
|
||||
if d < min_dist :
|
||||
min_dist = d
|
||||
new_name = name
|
||||
osm_type = osm_type # Add type: 'way' or 'relation'
|
||||
osm_id = id # Add OSM id
|
||||
|
||||
return Landmark(
|
||||
name=new_name,
|
||||
|
||||
@@ -1,19 +1,15 @@
|
||||
"""Module used to import data from OSM and arrange them in categories."""
|
||||
import logging
|
||||
import xml.etree.ElementTree as ET
|
||||
import yaml
|
||||
|
||||
|
||||
from ..structs.preferences import Preferences
|
||||
from ..structs.landmark import Landmark
|
||||
from .take_most_important import take_most_important
|
||||
from .cluster_manager import ClusterManager
|
||||
from ..overpass.overpass import Overpass, get_base_info
|
||||
from .utils import create_bbox
|
||||
|
||||
from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH, OSM_CACHE_DIR
|
||||
|
||||
# silence the overpass logger
|
||||
logging.getLogger('Overpass').setLevel(level=logging.CRITICAL)
|
||||
from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH
|
||||
|
||||
|
||||
class LandmarkManager:
|
||||
@@ -37,8 +33,7 @@ class LandmarkManager:
|
||||
|
||||
with LANDMARK_PARAMETERS_PATH.open('r') as f:
|
||||
parameters = yaml.safe_load(f)
|
||||
self.max_bbox_side = parameters['city_bbox_side']
|
||||
self.radius_close_to = parameters['radius_close_to']
|
||||
self.max_bbox_side = parameters['max_bbox_side']
|
||||
self.church_coeff = parameters['church_coeff']
|
||||
self.nature_coeff = parameters['nature_coeff']
|
||||
self.overall_coeff = parameters['overall_coeff']
|
||||
@@ -56,7 +51,7 @@ class LandmarkManager:
|
||||
self.detour_factor = parameters['detour_factor']
|
||||
|
||||
# Setup the caching in the Overpass class.
|
||||
self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR)
|
||||
self.overpass = Overpass()
|
||||
|
||||
self.logger.info('LandmakManager successfully initialized.')
|
||||
|
||||
@@ -80,39 +75,39 @@ class LandmarkManager:
|
||||
"""
|
||||
self.logger.debug('Starting to fetch landmarks...')
|
||||
max_walk_dist = int((preferences.max_time_minute/2)/60*self.walking_speed*1000/self.detour_factor)
|
||||
reachable_bbox_side = min(max_walk_dist, self.max_bbox_side)
|
||||
radius = min(max_walk_dist, int(self.max_bbox_side/2))
|
||||
|
||||
# use set to avoid duplicates, this requires some __methods__ to be set in Landmark
|
||||
all_landmarks = set()
|
||||
|
||||
# Create a bbox using the around technique, tuple of strings
|
||||
bbox = tuple((min(2000, reachable_bbox_side/2), center_coordinates[0], center_coordinates[1]))
|
||||
bbox = create_bbox(center_coordinates, radius)
|
||||
|
||||
# list for sightseeing
|
||||
if preferences.sightseeing.score != 0:
|
||||
self.logger.debug('Fetching sightseeing landmarks...')
|
||||
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['sightseeing'], preferences.sightseeing.type, preferences.sightseeing.score)
|
||||
all_landmarks.update(current_landmarks)
|
||||
self.logger.debug('Fetching sightseeing clusters...')
|
||||
self.logger.info(f'Found {len(current_landmarks)} sightseeing landmarks')
|
||||
|
||||
# special pipeline for historic neighborhoods
|
||||
neighborhood_manager = ClusterManager(bbox, 'sightseeing')
|
||||
historic_clusters = neighborhood_manager.generate_clusters()
|
||||
all_landmarks.update(historic_clusters)
|
||||
self.logger.debug('Sightseeing clusters done')
|
||||
|
||||
# list for nature
|
||||
if preferences.nature.score != 0:
|
||||
self.logger.debug('Fetching nature landmarks...')
|
||||
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['nature'], preferences.nature.type, preferences.nature.score)
|
||||
all_landmarks.update(current_landmarks)
|
||||
self.logger.info(f'Found {len(current_landmarks)} nature landmarks')
|
||||
|
||||
|
||||
# list for shopping
|
||||
if preferences.shopping.score != 0:
|
||||
self.logger.debug('Fetching shopping landmarks...')
|
||||
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['shopping'], preferences.shopping.type, preferences.shopping.score)
|
||||
self.logger.debug('Fetching shopping clusters...')
|
||||
self.logger.info(f'Found {len(current_landmarks)} shopping landmarks')
|
||||
|
||||
# set time for all shopping activites :
|
||||
for landmark in current_landmarks :
|
||||
@@ -123,8 +118,6 @@ class LandmarkManager:
|
||||
shopping_manager = ClusterManager(bbox, 'shopping')
|
||||
shopping_clusters = shopping_manager.generate_clusters()
|
||||
all_landmarks.update(shopping_clusters)
|
||||
self.logger.debug('Shopping clusters done')
|
||||
|
||||
|
||||
|
||||
landmarks_constrained = take_most_important(all_landmarks, self.n_important)
|
||||
@@ -179,7 +172,7 @@ class LandmarkManager:
|
||||
"""
|
||||
return_list = []
|
||||
|
||||
if landmarktype == 'nature' : query_conditions = []
|
||||
if landmarktype == 'nature' : query_conditions = None
|
||||
else : query_conditions = ['count_tags()>5']
|
||||
|
||||
# caution, when applying a list of selectors, overpass will search for elements that match ALL selectors simultaneously
|
||||
@@ -190,119 +183,115 @@ class LandmarkManager:
|
||||
osm_types = ['way', 'relation']
|
||||
|
||||
if 'viewpoint' in sel :
|
||||
query_conditions = []
|
||||
query_conditions = None
|
||||
osm_types.append('node')
|
||||
|
||||
query = self.overpass.build_query(
|
||||
area = bbox,
|
||||
osm_types = osm_types,
|
||||
selector = sel,
|
||||
conditions = query_conditions, # except for nature....
|
||||
out = 'center'
|
||||
)
|
||||
self.logger.debug(f"Query: {query}")
|
||||
|
||||
# Send the overpass query
|
||||
try:
|
||||
result = self.overpass.send_query(query)
|
||||
result = self.overpass.send_query(
|
||||
bbox = bbox,
|
||||
osm_types = osm_types,
|
||||
selector = sel,
|
||||
conditions = query_conditions, # except for nature....
|
||||
out = 'ids center tags'
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error fetching landmarks: {e}")
|
||||
continue
|
||||
|
||||
return_list += self.xml_to_landmarks(result, landmarktype, preference_level)
|
||||
return_list += self._to_landmarks(result, landmarktype, preference_level)
|
||||
|
||||
self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}")
|
||||
|
||||
return return_list
|
||||
|
||||
|
||||
def xml_to_landmarks(self, root: ET.Element, landmarktype, preference_level) -> list[Landmark]:
|
||||
def _to_landmarks(self, elements: list, landmarktype, preference_level) -> list[Landmark]:
|
||||
"""
|
||||
Parse the Overpass API result and extract landmarks.
|
||||
|
||||
This method processes the XML root element returned by the Overpass API and
|
||||
This method processes the JSON elements returned by the Overpass API and
|
||||
extracts landmarks of types 'node', 'way', and 'relation'. It retrieves
|
||||
relevant information such as name, coordinates, and tags, and converts them
|
||||
into Landmark objects.
|
||||
|
||||
Args:
|
||||
root (ET.Element): The root element of the XML response from Overpass API.
|
||||
elements (list): The elements of json response from Overpass API.
|
||||
elem_type (str): The type of landmark (e.g., node, way, relation).
|
||||
|
||||
Returns:
|
||||
list[Landmark]: A list of Landmark objects extracted from the XML data.
|
||||
list[Landmark]: A list of Landmark objects extracted from the JSON data.
|
||||
"""
|
||||
if root is None :
|
||||
if elements is None :
|
||||
return []
|
||||
|
||||
landmarks = []
|
||||
for osm_type in ['node', 'way', 'relation'] :
|
||||
for elem in root.findall(osm_type):
|
||||
for elem in elements:
|
||||
osm_type = elem.get('type')
|
||||
|
||||
id, coords, name = get_base_info(elem, osm_type, with_name=True)
|
||||
|
||||
if name is None or coords is None :
|
||||
continue
|
||||
|
||||
tags = elem.findall('tag')
|
||||
|
||||
# Convert this to Landmark object
|
||||
landmark = Landmark(name=name,
|
||||
type=landmarktype,
|
||||
location=coords,
|
||||
osm_id=id,
|
||||
osm_type=osm_type,
|
||||
attractiveness=0,
|
||||
n_tags=len(tags))
|
||||
|
||||
# Browse through tags to add information to landmark.
|
||||
for tag in tags:
|
||||
key = tag.get('k')
|
||||
value = tag.get('v')
|
||||
|
||||
# Skip this landmark if not suitable.
|
||||
if key == 'building:part' and value == 'yes' :
|
||||
break
|
||||
if 'disused:' in key :
|
||||
break
|
||||
if 'boundary:' in key :
|
||||
break
|
||||
if 'shop' in key and landmarktype != 'shopping' :
|
||||
break
|
||||
# if value == 'apartments' :
|
||||
# break
|
||||
|
||||
# Fill in the other attributes.
|
||||
if key == 'image' :
|
||||
landmark.image_url = value
|
||||
if key == 'website' :
|
||||
landmark.website_url = value
|
||||
if key == 'place_of_worship' :
|
||||
landmark.is_place_of_worship = True
|
||||
if key == 'wikipedia' :
|
||||
landmark.wiki_url = value
|
||||
if key == 'name:en' :
|
||||
landmark.name_en = value
|
||||
if 'building:' in key or 'pay' in key :
|
||||
landmark.n_tags -= 1
|
||||
|
||||
# Set the duration.
|
||||
if value in ['museum', 'aquarium', 'planetarium'] :
|
||||
landmark.duration = 60
|
||||
elif value == 'viewpoint' :
|
||||
landmark.is_viewpoint = True
|
||||
landmark.duration = 10
|
||||
elif value == 'cathedral' :
|
||||
landmark.is_place_of_worship = False
|
||||
landmark.duration = 10
|
||||
else :
|
||||
landmark.duration = 5
|
||||
|
||||
else:
|
||||
self.set_landmark_score(landmark, landmarktype, preference_level)
|
||||
landmarks.append(landmark)
|
||||
id, coords, name = get_base_info(elem, osm_type, with_name=True)
|
||||
|
||||
if name is None or coords is None :
|
||||
continue
|
||||
|
||||
tags = elem.get('tags')
|
||||
|
||||
# Convert this to Landmark object
|
||||
landmark = Landmark(name=name,
|
||||
type=landmarktype,
|
||||
location=coords,
|
||||
osm_id=id,
|
||||
osm_type=osm_type,
|
||||
attractiveness=0,
|
||||
n_tags=len(tags))
|
||||
|
||||
# self.logger.debug('added landmark.')
|
||||
|
||||
# Browse through tags to add information to landmark.
|
||||
for key, value in tags.items():
|
||||
|
||||
# Skip this landmark if not suitable.
|
||||
if key == 'building:part' and value == 'yes' :
|
||||
break
|
||||
if 'disused:' in key :
|
||||
break
|
||||
if 'boundary:' in key :
|
||||
break
|
||||
if 'shop' in key and landmarktype != 'shopping' :
|
||||
break
|
||||
# if value == 'apartments' :
|
||||
# break
|
||||
|
||||
# Fill in the other attributes.
|
||||
if key == 'image' :
|
||||
landmark.image_url = value
|
||||
if key == 'website' :
|
||||
landmark.website_url = value
|
||||
if key == 'place_of_worship' :
|
||||
landmark.is_place_of_worship = True
|
||||
if key == 'wikipedia' :
|
||||
landmark.wiki_url = value
|
||||
if key == 'name:en' :
|
||||
landmark.name_en = value
|
||||
if 'building:' in key or 'pay' in key :
|
||||
landmark.n_tags -= 1
|
||||
|
||||
# Set the duration.
|
||||
if value in ['museum', 'aquarium', 'planetarium'] :
|
||||
landmark.duration = 60
|
||||
elif value == 'viewpoint' :
|
||||
landmark.is_viewpoint = True
|
||||
landmark.duration = 10
|
||||
elif value == 'cathedral' :
|
||||
landmark.is_place_of_worship = False
|
||||
landmark.duration = 10
|
||||
|
||||
else:
|
||||
self.set_landmark_score(landmark, landmarktype, preference_level)
|
||||
landmarks.append(landmark)
|
||||
|
||||
continue
|
||||
|
||||
return landmarks
|
||||
|
||||
def dict_to_selector_list(d: dict) -> list:
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
"""Module for finding public toilets around given coordinates."""
|
||||
import logging
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from ..overpass.overpass import Overpass, get_base_info
|
||||
from ..structs.landmark import Toilets
|
||||
from ..constants import OSM_CACHE_DIR
|
||||
from .utils import create_bbox
|
||||
|
||||
|
||||
# silence the overpass logger
|
||||
@@ -41,7 +40,7 @@ class ToiletsManager:
|
||||
self.location = location
|
||||
|
||||
# Setup the caching in the Overpass class.
|
||||
self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR)
|
||||
self.overpass = Overpass()
|
||||
|
||||
|
||||
def generate_toilet_list(self) -> list[Toilets] :
|
||||
@@ -53,73 +52,71 @@ class ToiletsManager:
|
||||
list[Toilets]: A list of `Toilets` objects containing detailed information
|
||||
about the toilets found around the given coordinates.
|
||||
"""
|
||||
bbox = tuple((self.radius, self.location[0], self.location[1]))
|
||||
bbox = create_bbox(self.location, self.radius)
|
||||
osm_types = ['node', 'way', 'relation']
|
||||
toilets_list = []
|
||||
|
||||
query = self.overpass.build_query(
|
||||
area = bbox,
|
||||
osm_types = osm_types,
|
||||
selector = '"amenity"="toilets"',
|
||||
out = 'ids center tags'
|
||||
)
|
||||
self.logger.debug(f"Query: {query}")
|
||||
|
||||
query = Overpass.build_query(
|
||||
bbox = bbox,
|
||||
osm_types = osm_types,
|
||||
selector = '"amenity"="toilets"',
|
||||
out = 'ids center tags'
|
||||
)
|
||||
try:
|
||||
result = self.overpass.send_query(query)
|
||||
result = self.overpass.fetch_data_from_api(query_str=query)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error fetching landmarks: {e}")
|
||||
return None
|
||||
|
||||
toilets_list = self.xml_to_toilets(result)
|
||||
toilets_list = self.to_toilets(result)
|
||||
|
||||
return toilets_list
|
||||
|
||||
|
||||
def xml_to_toilets(self, root: ET.Element) -> list[Toilets]:
|
||||
def to_toilets(self, elements: list) -> list[Toilets]:
|
||||
"""
|
||||
Parse the Overpass API result and extract landmarks.
|
||||
|
||||
This method processes the XML root element returned by the Overpass API and
|
||||
This method processes the JSON elements returned by the Overpass API and
|
||||
extracts landmarks of types 'node', 'way', and 'relation'. It retrieves
|
||||
relevant information such as name, coordinates, and tags, and converts them
|
||||
into Landmark objects.
|
||||
|
||||
Args:
|
||||
root (ET.Element): The root element of the XML response from Overpass API.
|
||||
list (osm elements): The root element of the JSON response from Overpass API.
|
||||
elem_type (str): The type of landmark (e.g., node, way, relation).
|
||||
|
||||
Returns:
|
||||
list[Landmark]: A list of Landmark objects extracted from the XML data.
|
||||
list[Landmark]: A list of Landmark objects extracted from the JSON data.
|
||||
"""
|
||||
if root is None :
|
||||
if elements is None :
|
||||
return []
|
||||
|
||||
toilets_list = []
|
||||
for osm_type in ['node', 'way', 'relation'] :
|
||||
for elem in root.findall(osm_type):
|
||||
# Get coordinates and append them to the points list
|
||||
_, coords = get_base_info(elem, osm_type)
|
||||
if coords is None :
|
||||
continue
|
||||
for elem in elements:
|
||||
osm_type = elem.get('type')
|
||||
# Get coordinates and append them to the points list
|
||||
_, coords = get_base_info(elem, osm_type)
|
||||
if coords is None :
|
||||
continue
|
||||
|
||||
toilets = Toilets(location=coords)
|
||||
toilets = Toilets(location=coords)
|
||||
|
||||
# Extract tags as a dictionary
|
||||
tags = {tag.get('k'): tag.get('v') for tag in elem.findall('tag')}
|
||||
# Extract tags as a dictionary
|
||||
tags = elem.get('tags')
|
||||
|
||||
if 'wheelchair' in tags.keys() and tags['wheelchair'] == 'yes':
|
||||
toilets.wheelchair = True
|
||||
if 'wheelchair' in tags.keys() and tags['wheelchair'] == 'yes':
|
||||
toilets.wheelchair = True
|
||||
|
||||
if 'changing_table' in tags.keys() and tags['changing_table'] == 'yes':
|
||||
toilets.changing_table = True
|
||||
if 'changing_table' in tags.keys() and tags['changing_table'] == 'yes':
|
||||
toilets.changing_table = True
|
||||
|
||||
if 'fee' in tags.keys() and tags['fee'] == 'yes':
|
||||
toilets.fee = True
|
||||
if 'fee' in tags.keys() and tags['fee'] == 'yes':
|
||||
toilets.fee = True
|
||||
|
||||
if 'opening_hours' in tags.keys() :
|
||||
toilets.opening_hours = tags['opening_hours']
|
||||
if 'opening_hours' in tags.keys() :
|
||||
toilets.opening_hours = tags['opening_hours']
|
||||
|
||||
toilets_list.append(toilets)
|
||||
toilets_list.append(toilets)
|
||||
|
||||
return toilets_list
|
||||
|
||||
27
backend/src/utils/utils.py
Normal file
27
backend/src/utils/utils.py
Normal file
@@ -0,0 +1,27 @@
|
||||
"""Various helper functions"""
|
||||
import math as m
|
||||
|
||||
def create_bbox(coords: tuple[float, float], radius: int):
|
||||
"""
|
||||
Create a bounding box around the given coordinates.
|
||||
|
||||
Args:
|
||||
coords (tuple[float, float]): The latitude and longitude of the center of the bounding box.
|
||||
radius (int): The half-side length of the bounding box in meters.
|
||||
|
||||
Returns:
|
||||
tuple[float, float, float, float]: The minimum latitude, minimum longitude, maximum latitude, and maximum longitude
|
||||
defining the bounding box.
|
||||
"""
|
||||
# Earth's radius in meters
|
||||
R = 6378137
|
||||
lat, lon = coords
|
||||
d_lat = radius / R
|
||||
d_lon = radius / (R * m.cos(m.pi * lat / 180))
|
||||
|
||||
lat_min = lat - d_lat * 180 / m.pi
|
||||
lat_max = lat + d_lat * 180 / m.pi
|
||||
lon_min = lon - d_lon * 180 / m.pi
|
||||
lon_max = lon + d_lon * 180 / m.pi
|
||||
|
||||
return (lat_min, lon_min, lat_max, lon_max)
|
||||
Reference in New Issue
Block a user