Update pages/5_đ_VertXtractor.py
Browse files- pages/5_đ_VertXtractor.py +93 -145
pages/5_đ_VertXtractor.py
CHANGED
@@ -1,157 +1,105 @@
|
|
1 |
-
import
|
2 |
-
import csv
|
3 |
-
import json
|
4 |
-
import logging
|
5 |
import requests
|
6 |
-
import
|
7 |
-
from
|
8 |
-
from osgeo import gdal
|
9 |
import geopandas as gpd
|
10 |
import rasterio
|
11 |
-
|
12 |
-
|
13 |
-
|
14 |
-
|
15 |
-
|
16 |
-
logger = logging.getLogger(__name__)
|
17 |
|
18 |
-
#
|
19 |
-
SUPPORTED_FORMATS = ['.tif', '.png', '.tiff', '.TIFF']
|
20 |
-
NON_IMAGE_LAYERS = 'swissalti'
|
21 |
STAC_API_URL = "https://data.geo.admin.ch/api/stac/v0.9/collections/"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
22 |
|
23 |
-
|
24 |
-
|
25 |
-
|
26 |
-
|
27 |
-
|
28 |
-
|
29 |
-
|
30 |
-
|
31 |
-
|
32 |
-
|
33 |
-
|
34 |
-
|
35 |
-
Returns:
|
36 |
-
Tuple[List[str], bool]: List of item URLs and a boolean indicating if there are more than 100 items.
|
37 |
-
"""
|
38 |
-
try:
|
39 |
-
url = f"{STAC_API_URL}{product_name}/items?bbox={ll_lon},{ll_lat},{ur_lon},{ur_lat}"
|
40 |
-
response = requests.get(url)
|
41 |
-
response.raise_for_status()
|
42 |
-
items_result = response.json()
|
43 |
-
|
44 |
-
assets = items_result.get('features', [])
|
45 |
-
item_files = [asset['assets']['data']['href'] for asset in assets if 'data' in asset['assets']]
|
46 |
-
|
47 |
-
more_than_100 = len(items_result.get('links', [])) >= 6
|
48 |
-
|
49 |
-
if more_than_100 and not first_100:
|
50 |
-
while 'next' in [link['rel'] for link in items_result.get('links', [])]:
|
51 |
-
next_url = next(link['href'] for link in items_result['links'] if link['rel'] == 'next')
|
52 |
-
response = requests.get(next_url)
|
53 |
-
response.raise_for_status()
|
54 |
-
items_result = response.json()
|
55 |
-
assets = items_result.get('features', [])
|
56 |
-
item_files.extend([asset['assets']['data']['href'] for asset in assets if 'data' in asset['assets']])
|
57 |
-
|
58 |
-
# Filter items based on specific conditions
|
59 |
-
item_files = filter_items(item_files, product_name)
|
60 |
-
|
61 |
-
return item_files, more_than_100
|
62 |
-
except requests.RequestException as e:
|
63 |
-
logger.error(f"Error retrieving items: {e}")
|
64 |
-
return [], False
|
65 |
-
|
66 |
-
def filter_items(items: List[str], product_name: str) -> List[str]:
|
67 |
-
"""
|
68 |
-
Filter items based on specific conditions.
|
69 |
-
|
70 |
-
Args:
|
71 |
-
items (List[str]): List of item URLs.
|
72 |
-
product_name (str): Name of the product.
|
73 |
-
|
74 |
-
Returns:
|
75 |
-
List[str]: Filtered list of item URLs.
|
76 |
-
"""
|
77 |
-
if "_krel_" in items[0]:
|
78 |
-
items = [i for i in items if "_krel_" in i]
|
79 |
-
|
80 |
-
if "_0.1_" in items[0]:
|
81 |
-
items = [i for i in items if "_0.1_" in i]
|
82 |
-
|
83 |
-
if product_name == 'ch.swisstopo.swissalti3d':
|
84 |
-
items = [i for i in items if ".tif" in i]
|
85 |
-
items = [i for i in items if "_0.5_" in i]
|
86 |
-
|
87 |
-
return items
|
88 |
|
89 |
-
def
|
90 |
-
"""
|
91 |
-
|
92 |
-
|
93 |
-
|
94 |
-
|
95 |
-
|
96 |
-
|
97 |
-
|
98 |
-
|
99 |
-
|
100 |
-
Returns:
|
101 |
-
str: Path to the created CSV file.
|
102 |
-
"""
|
103 |
-
try:
|
104 |
-
coords = f"{ll_lon}_{ll_lat}_{ur_lon}_{ur_lat}"
|
105 |
-
csv_filepath = os.path.join(os.getcwd(), f"{product_name}{coords}.csv")
|
106 |
-
|
107 |
-
item_files, _ = get_items(product_name, ll_lon, ll_lat, ur_lon, ur_lat, first_100=False)
|
108 |
-
|
109 |
-
with open(csv_filepath, 'w', newline='') as f:
|
110 |
-
writer = csv.writer(f)
|
111 |
-
writer.writerows([[item] for item in item_files])
|
112 |
|
113 |
-
|
114 |
-
|
115 |
-
|
116 |
-
return ""
|
117 |
-
|
118 |
-
def process_csv(csv_filepath: str, bbox: Optional[Tuple[float, float, float, float]] = None) -> None:
|
119 |
-
"""
|
120 |
-
Process the CSV file containing download URLs.
|
121 |
-
|
122 |
-
Args:
|
123 |
-
csv_filepath (str): Path to the CSV file.
|
124 |
-
bbox (Optional[Tuple[float, float, float, float]]): Bounding box for cropping (minx, miny, maxx, maxy).
|
125 |
-
"""
|
126 |
-
try:
|
127 |
-
download_dir, order_name = os.path.split(os.path.abspath(csv_filepath))
|
128 |
-
os.chdir(download_dir)
|
129 |
-
|
130 |
-
with open(csv_filepath, 'r') as file:
|
131 |
-
urls = file.readlines()
|
132 |
-
|
133 |
-
for i, url in enumerate(urls):
|
134 |
-
url = url.strip()
|
135 |
-
filename = os.path.join(download_dir, url.rsplit('/', 1)[-1])
|
136 |
|
137 |
-
|
138 |
-
|
139 |
-
|
|
|
|
|
|
|
|
|
140 |
|
141 |
-
|
142 |
-
|
|
|
143 |
|
144 |
-
|
145 |
-
|
146 |
-
|
147 |
-
|
148 |
-
|
149 |
-
|
150 |
-
|
151 |
-
|
152 |
-
|
153 |
-
|
154 |
-
|
155 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
156 |
|
157 |
-
|
|
|
|
1 |
+
import streamlit as st
|
|
|
|
|
|
|
2 |
import requests
|
3 |
+
import folium
|
4 |
+
from streamlit_folium import folium_static
|
|
|
5 |
import geopandas as gpd
|
6 |
import rasterio
|
7 |
+
from rasterio.merge import merge
|
8 |
+
import os
|
9 |
+
import tempfile
|
10 |
+
import zipfile
|
11 |
+
from io import BytesIO
|
|
|
12 |
|
13 |
+
# Configurations et constantes
|
|
|
|
|
14 |
STAC_API_URL = "https://data.geo.admin.ch/api/stac/v0.9/collections/"
|
15 |
+
PRODUCTS = {
|
16 |
+
'Luftbild 10cm': 'ch.swisstopo.swissimage-dop10',
|
17 |
+
'Landeskarte 1:10': 'ch.swisstopo.landeskarte-farbe-10',
|
18 |
+
'Landeskarte 1:25': 'ch.swisstopo.pixelkarte-farbe-pk25.noscale',
|
19 |
+
'Landeskarte 1:50': 'ch.swisstopo.pixelkarte-farbe-pk50.noscale',
|
20 |
+
'Landeskarte 1:100': 'ch.swisstopo.pixelkarte-farbe-pk100.noscale',
|
21 |
+
'Landeskarte 1:200': 'ch.swisstopo.pixelkarte-farbe-pk200.noscale',
|
22 |
+
'Höhenmodell': 'ch.swisstopo.swissalti3d',
|
23 |
+
}
|
24 |
|
25 |
+
@st.cache_data
|
26 |
+
def get_items(product_name, bbox):
|
27 |
+
"""RĂ©cupĂšre les Ă©lĂ©ments pour une zone d'intĂ©rĂȘt (AOI)."""
|
28 |
+
url = f"{STAC_API_URL}{product_name}/items?bbox={bbox}"
|
29 |
+
response = requests.get(url)
|
30 |
+
if response.status_code == 200:
|
31 |
+
items = response.json().get('features', [])
|
32 |
+
return [item['assets']['data']['href'] for item in items if 'data' in item['assets']]
|
33 |
+
else:
|
34 |
+
st.error(f"Erreur lors de la récupération des éléments: {response.status_code}")
|
35 |
+
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
36 |
|
37 |
+
def download_and_merge(urls):
|
38 |
+
"""Télécharge et fusionne les tuiles raster."""
|
39 |
+
with tempfile.TemporaryDirectory() as tmpdir:
|
40 |
+
files = []
|
41 |
+
for url in urls:
|
42 |
+
response = requests.get(url)
|
43 |
+
if response.status_code == 200:
|
44 |
+
file_path = os.path.join(tmpdir, os.path.basename(url))
|
45 |
+
with open(file_path, 'wb') as f:
|
46 |
+
f.write(response.content)
|
47 |
+
files.append(file_path)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
48 |
|
49 |
+
if files:
|
50 |
+
src_files_to_mosaic = [rasterio.open(fp) for fp in files]
|
51 |
+
mosaic, out_trans = merge(src_files_to_mosaic)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
52 |
|
53 |
+
out_meta = src_files_to_mosaic[0].meta.copy()
|
54 |
+
out_meta.update({
|
55 |
+
"driver": "GTiff",
|
56 |
+
"height": mosaic.shape[1],
|
57 |
+
"width": mosaic.shape[2],
|
58 |
+
"transform": out_trans
|
59 |
+
})
|
60 |
|
61 |
+
merged_path = os.path.join(tmpdir, "merged.tif")
|
62 |
+
with rasterio.open(merged_path, "w", **out_meta) as dest:
|
63 |
+
dest.write(mosaic)
|
64 |
|
65 |
+
return merged_path
|
66 |
+
return None
|
67 |
+
|
68 |
+
def main():
|
69 |
+
st.title("SwissTopo Map Downloader")
|
70 |
+
|
71 |
+
# SĂ©lection du produit
|
72 |
+
product = st.selectbox("Choisissez un produit", list(PRODUCTS.keys()))
|
73 |
+
|
74 |
+
# Carte pour sélectionner la bbox
|
75 |
+
m = folium.Map(location=[46.8182, 8.2275], zoom_start=8)
|
76 |
+
draw = folium.plugins.Draw(export=True)
|
77 |
+
draw.add_to(m)
|
78 |
+
folium_static(m)
|
79 |
+
|
80 |
+
# Récupération de la bbox dessinée
|
81 |
+
if 'last_active_drawing' in st.session_state:
|
82 |
+
bbox = st.session_state.last_active_drawing['geometry']['coordinates'][0]
|
83 |
+
bbox = f"{bbox[0][0]},{bbox[0][1]},{bbox[2][0]},{bbox[2][1]}"
|
84 |
+
st.write(f"BBox sélectionnée : {bbox}")
|
85 |
+
|
86 |
+
if st.button("Télécharger"):
|
87 |
+
with st.spinner("Téléchargement en cours..."):
|
88 |
+
urls = get_items(PRODUCTS[product], bbox)
|
89 |
+
if urls:
|
90 |
+
merged_file = download_and_merge(urls)
|
91 |
+
if merged_file:
|
92 |
+
with open(merged_file, "rb") as file:
|
93 |
+
btn = st.download_button(
|
94 |
+
label="Télécharger le fichier fusionné",
|
95 |
+
data=file,
|
96 |
+
file_name="merged_map.tif",
|
97 |
+
mime="image/tiff"
|
98 |
+
)
|
99 |
+
else:
|
100 |
+
st.error("Erreur lors de la fusion des fichiers.")
|
101 |
+
else:
|
102 |
+
st.warning("Aucune donnée trouvée pour cette zone.")
|
103 |
|
104 |
+
if __name__ == "__main__":
|
105 |
+
main()
|