|
import streamlit as st |
|
import geopandas as gpd |
|
import folium |
|
from streamlit_folium import folium_static, st_folium |
|
from folium.plugins import Draw |
|
import requests |
|
from shapely.geometry import box |
|
import json |
|
from PIL import Image |
|
import io |
|
import numpy as np |
|
import tempfile |
|
import os |
|
import urllib.request |
|
import zipfile |
|
|
|
st.set_page_config(layout="wide", page_title="Extracteur de données géospatiales Swisstopo") |
|
|
|
|
|
URL_STAC_SWISSTOPO_BASE = 'https://data.geo.admin.ch/api/stac/v0.9/collections/' |
|
DIC_LAYERS = { |
|
'ortho': 'ch.swisstopo.swissimage-dop10', |
|
'mnt': 'ch.swisstopo.swissalti3d', |
|
'mns': 'ch.swisstopo.swisssurface3d-raster', |
|
'bati3D': 'ch.swisstopo.swissbuildings3d_2', |
|
'bati3D_v3': 'ch.swisstopo.swissbuildings3d_3_0', |
|
} |
|
|
|
FOLDER_NAME_SWISSTOPO = "swisstopo" |
|
NB_POLYGONES_MAX = 2000000 |
|
|
|
|
|
@st.cache_data |
|
def lv95towgs84(x, y): |
|
url = f'http://geodesy.geo.admin.ch/reframe/lv95towgs84?easting={x}&northing={y}&format=json' |
|
with urllib.request.urlopen(url) as f: |
|
json_res = json.loads(f.read().decode('utf-8')) |
|
return float(json_res['easting']), float(json_res['northing']) |
|
|
|
@st.cache_data |
|
def get_list_from_STAC_swisstopo(url, xmin, ymin, xmax, ymax, gdb=False): |
|
est, sud = lv95towgs84(xmin, ymin) |
|
ouest, nord = lv95towgs84(xmax, ymax) |
|
suffix_url = f"/items?bbox={est},{sud},{ouest},{nord}" |
|
url += suffix_url |
|
res = [] |
|
|
|
while url: |
|
with urllib.request.urlopen(url) as f: |
|
json_res = json.loads(f.read().decode('utf-8')) |
|
|
|
url = None |
|
for link in json_res.get('links', []): |
|
if link['rel'] == 'next': |
|
url = link['href'] |
|
|
|
for item in json_res['features']: |
|
for k, dic in item['assets'].items(): |
|
href = dic['href'] |
|
if gdb: |
|
if href.endswith('.gdb.zip') and len(href.split('/')[-1].split('_')) == 7: |
|
res.append(href) |
|
elif not href.endswith(('.xyz.zip', '.gdb.zip')): |
|
res.append(href) |
|
|
|
return res |
|
|
|
def suppr_doublons_list_ortho(lst): |
|
dic = {} |
|
for url in lst: |
|
nom, an, noflle, taille_px, epsg = url.split('/')[-1][:-4].split('_') |
|
dic.setdefault((noflle, float(taille_px)), []).append((an, url)) |
|
return [sorted(lst, reverse=True)[0][1] for noflle, lst in dic.items()] |
|
|
|
def suppr_doublons_bati3D(lst_url): |
|
dico = {} |
|
for dxf in [url for url in lst_url if url.endswith('.dxf.zip')]: |
|
*a, date, feuille = dxf.split('/')[-2].split('_') |
|
dico.setdefault(feuille, []).append((date, dxf)) |
|
return [sorted(liste, reverse=True)[0][1] for k, liste in dico.items()] |
|
|
|
def download_file(url, filename): |
|
with urllib.request.urlopen(url) as response, open(filename, 'wb') as out_file: |
|
out_file.write(response.read()) |
|
|
|
def unzip_file(zip_file, extract_to): |
|
with zipfile.ZipFile(zip_file, 'r') as zip_ref: |
|
zip_ref.extractall(extract_to) |
|
|
|
|
|
def main(): |
|
st.title("Extracteur de données Swisstopo") |
|
|
|
|
|
st.sidebar.header("Sélection des couches") |
|
mnt2m = st.sidebar.checkbox("MNT 2m") |
|
mnt50cm = st.sidebar.checkbox("MNT 50cm") |
|
mns = st.sidebar.checkbox("MNS (modèle numérique de surface)") |
|
bati3D = st.sidebar.checkbox("Bâtiments 3D") |
|
bati3D_v3 = st.sidebar.checkbox("Bâtiments 3D v3 (print 3D)") |
|
ortho2m = st.sidebar.checkbox("Orthophoto 2m") |
|
ortho10cm = st.sidebar.checkbox("Orthophoto 10cm") |
|
|
|
|
|
col1, col2 = st.columns([1, 2]) |
|
|
|
with col1: |
|
st.header("Définir l'emprise") |
|
xmin = st.number_input("X min", value=2600000.0) |
|
ymin = st.number_input("Y min", value=1200000.0) |
|
xmax = st.number_input("X max", value=2600100.0) |
|
ymax = st.number_input("Y max", value=1200100.0) |
|
|
|
with col2: |
|
st.header("Carte") |
|
m = folium.Map(location=[46.8, 8.2], zoom_start=8) |
|
Draw(draw_options={'polyline': False, 'polygon': False, 'circle': False, 'marker': False, 'circlemarker': False}, |
|
edit_options={'edit': False}).add_to(m) |
|
output = st_folium(m, width=700, height=500) |
|
|
|
if output['last_active_drawing']: |
|
coords = output['last_active_drawing']['geometry']['coordinates'][0] |
|
xmin, ymin = min(c[0] for c in coords), min(c[1] for c in coords) |
|
xmax, ymax = max(c[0] for c in coords), max(c[1] for c in coords) |
|
st.success("Emprise sélectionnée!") |
|
|
|
if st.button("Obtenir les données"): |
|
bbox = (xmin, ymin, xmax, ymax) |
|
urls = [] |
|
|
|
|
|
if mnt2m or mnt50cm: |
|
tri = '_2_' if mnt2m else '_0.5_' |
|
url = URL_STAC_SWISSTOPO_BASE + DIC_LAYERS['mnt'] |
|
lst = [v for v in get_list_from_STAC_swisstopo(url, *bbox) if tri in v] |
|
urls += lst |
|
|
|
|
|
if mns: |
|
url = URL_STAC_SWISSTOPO_BASE + DIC_LAYERS['mns'] |
|
lst = get_list_from_STAC_swisstopo(url, *bbox) |
|
urls += lst |
|
|
|
|
|
if bati3D: |
|
url = URL_STAC_SWISSTOPO_BASE + DIC_LAYERS['bati3D'] |
|
lst = get_list_from_STAC_swisstopo(url, *bbox) |
|
lst = suppr_doublons_bati3D(lst) |
|
urls += lst |
|
|
|
|
|
if bati3D_v3: |
|
url = URL_STAC_SWISSTOPO_BASE + DIC_LAYERS['bati3D_v3'] |
|
lst = get_list_from_STAC_swisstopo(url, *bbox, gdb=True) |
|
urls += lst |
|
|
|
|
|
if ortho2m or ortho10cm: |
|
tri = '_2_' if ortho2m else '_0.1_' |
|
url = URL_STAC_SWISSTOPO_BASE + DIC_LAYERS['ortho'] |
|
lst = [v for v in get_list_from_STAC_swisstopo(url, *bbox) if tri in v] |
|
lst = suppr_doublons_list_ortho(lst) |
|
urls += lst |
|
|
|
if urls: |
|
st.write(f"Nombre de fichiers à télécharger : {len(urls)}") |
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdirname: |
|
progress_bar = st.progress(0) |
|
for i, url in enumerate(urls): |
|
filename = os.path.join(tmpdirname, url.split('/')[-1]) |
|
download_file(url, filename) |
|
if filename.endswith('.zip'): |
|
unzip_file(filename, tmpdirname) |
|
progress_bar.progress((i + 1) / len(urls)) |
|
|
|
st.success("Téléchargement terminé!") |
|
|
|
|
|
zip_filename = "swisstopo_data.zip" |
|
shutil.make_archive(zip_filename[:-4], 'zip', tmpdirname) |
|
|
|
with open(zip_filename, "rb") as fp: |
|
btn = st.download_button( |
|
label="Télécharger les données", |
|
data=fp, |
|
file_name=zip_filename, |
|
mime="application/zip" |
|
) |
|
else: |
|
st.warning("Aucun fichier à télécharger pour les critères sélectionnés.") |
|
|
|
if __name__ == "__main__": |
|
main() |