import streamlit as st import geopandas as gpd import folium from streamlit_folium import folium_static, st_folium from folium.plugins import Draw import requests from shapely.geometry import box import json from PIL import Image import io import numpy as np import tempfile import os import urllib.request import zipfile st.set_page_config(layout="wide", page_title="Extracteur de données géospatiales Swisstopo") # Constants URL_STAC_SWISSTOPO_BASE = 'https://data.geo.admin.ch/api/stac/v0.9/collections/' DIC_LAYERS = { 'ortho': 'ch.swisstopo.swissimage-dop10', 'mnt': 'ch.swisstopo.swissalti3d', 'mns': 'ch.swisstopo.swisssurface3d-raster', 'bati3D': 'ch.swisstopo.swissbuildings3d_2', 'bati3D_v3': 'ch.swisstopo.swissbuildings3d_3_0', } FOLDER_NAME_SWISSTOPO = "swisstopo" NB_POLYGONES_MAX = 2000000 # Helper functions @st.cache_data def lv95towgs84(x, y): url = f'http://geodesy.geo.admin.ch/reframe/lv95towgs84?easting={x}&northing={y}&format=json' with urllib.request.urlopen(url) as f: json_res = json.loads(f.read().decode('utf-8')) return float(json_res['easting']), float(json_res['northing']) @st.cache_data def get_list_from_STAC_swisstopo(url, xmin, ymin, xmax, ymax, gdb=False): est, sud = lv95towgs84(xmin, ymin) ouest, nord = lv95towgs84(xmax, ymax) suffix_url = f"/items?bbox={est},{sud},{ouest},{nord}" url += suffix_url res = [] while url: with urllib.request.urlopen(url) as f: json_res = json.loads(f.read().decode('utf-8')) url = None for link in json_res.get('links', []): if link['rel'] == 'next': url = link['href'] for item in json_res['features']: for k, dic in item['assets'].items(): href = dic['href'] if gdb: if href.endswith('.gdb.zip') and len(href.split('/')[-1].split('_')) == 7: res.append(href) elif not href.endswith(('.xyz.zip', '.gdb.zip')): res.append(href) return res def suppr_doublons_list_ortho(lst): dic = {} for url in lst: nom, an, noflle, taille_px, epsg = url.split('/')[-1][:-4].split('_') dic.setdefault((noflle, float(taille_px)), []).append((an, url)) return [sorted(lst, reverse=True)[0][1] for noflle, lst in dic.items()] def suppr_doublons_bati3D(lst_url): dico = {} for dxf in [url for url in lst_url if url.endswith('.dxf.zip')]: *a, date, feuille = dxf.split('/')[-2].split('_') dico.setdefault(feuille, []).append((date, dxf)) return [sorted(liste, reverse=True)[0][1] for k, liste in dico.items()] def download_file(url, filename): with urllib.request.urlopen(url) as response, open(filename, 'wb') as out_file: out_file.write(response.read()) def unzip_file(zip_file, extract_to): with zipfile.ZipFile(zip_file, 'r') as zip_ref: zip_ref.extractall(extract_to) # Main Streamlit App def main(): st.title("Extracteur de données Swisstopo") # Sidebar for layer selection st.sidebar.header("Sélection des couches") mnt2m = st.sidebar.checkbox("MNT 2m") mnt50cm = st.sidebar.checkbox("MNT 50cm") mns = st.sidebar.checkbox("MNS (modèle numérique de surface)") bati3D = st.sidebar.checkbox("Bâtiments 3D") bati3D_v3 = st.sidebar.checkbox("Bâtiments 3D v3 (print 3D)") ortho2m = st.sidebar.checkbox("Orthophoto 2m") ortho10cm = st.sidebar.checkbox("Orthophoto 10cm") # Main content col1, col2 = st.columns([1, 2]) with col1: st.header("Définir l'emprise") xmin = st.number_input("X min", value=2600000.0) ymin = st.number_input("Y min", value=1200000.0) xmax = st.number_input("X max", value=2600100.0) ymax = st.number_input("Y max", value=1200100.0) with col2: st.header("Carte") m = folium.Map(location=[46.8, 8.2], zoom_start=8) Draw(draw_options={'polyline': False, 'polygon': False, 'circle': False, 'marker': False, 'circlemarker': False}, edit_options={'edit': False}).add_to(m) output = st_folium(m, width=700, height=500) if output['last_active_drawing']: coords = output['last_active_drawing']['geometry']['coordinates'][0] xmin, ymin = min(c[0] for c in coords), min(c[1] for c in coords) xmax, ymax = max(c[0] for c in coords), max(c[1] for c in coords) st.success("Emprise sélectionnée!") if st.button("Obtenir les données"): bbox = (xmin, ymin, xmax, ymax) urls = [] # MNT if mnt2m or mnt50cm: tri = '_2_' if mnt2m else '_0.5_' url = URL_STAC_SWISSTOPO_BASE + DIC_LAYERS['mnt'] lst = [v for v in get_list_from_STAC_swisstopo(url, *bbox) if tri in v] urls += lst # MNS if mns: url = URL_STAC_SWISSTOPO_BASE + DIC_LAYERS['mns'] lst = get_list_from_STAC_swisstopo(url, *bbox) urls += lst # Bâtiments 3D if bati3D: url = URL_STAC_SWISSTOPO_BASE + DIC_LAYERS['bati3D'] lst = get_list_from_STAC_swisstopo(url, *bbox) lst = suppr_doublons_bati3D(lst) urls += lst # Bâtiments 3D v3 if bati3D_v3: url = URL_STAC_SWISSTOPO_BASE + DIC_LAYERS['bati3D_v3'] lst = get_list_from_STAC_swisstopo(url, *bbox, gdb=True) urls += lst # Orthophoto if ortho2m or ortho10cm: tri = '_2_' if ortho2m else '_0.1_' url = URL_STAC_SWISSTOPO_BASE + DIC_LAYERS['ortho'] lst = [v for v in get_list_from_STAC_swisstopo(url, *bbox) if tri in v] lst = suppr_doublons_list_ortho(lst) urls += lst if urls: st.write(f"Nombre de fichiers à télécharger : {len(urls)}") # Create a temporary directory for downloads with tempfile.TemporaryDirectory() as tmpdirname: progress_bar = st.progress(0) for i, url in enumerate(urls): filename = os.path.join(tmpdirname, url.split('/')[-1]) download_file(url, filename) if filename.endswith('.zip'): unzip_file(filename, tmpdirname) progress_bar.progress((i + 1) / len(urls)) st.success("Téléchargement terminé!") # Create a zip file of all downloaded content zip_filename = "swisstopo_data.zip" shutil.make_archive(zip_filename[:-4], 'zip', tmpdirname) with open(zip_filename, "rb") as fp: btn = st.download_button( label="Télécharger les données", data=fp, file_name=zip_filename, mime="application/zip" ) else: st.warning("Aucun fichier à télécharger pour les critères sélectionnés.") if __name__ == "__main__": main()