Streamlit / pages /12_🌲_VertXtractor.py
Vertdure's picture
Update pages/12_🌲_VertXtractor.py
57fb324 verified
raw
history blame
No virus
7.46 kB
import streamlit as st
import geopandas as gpd
import folium
from streamlit_folium import folium_static, st_folium
from folium.plugins import Draw
import requests
from shapely.geometry import box
import json
import io
import rasterio
from rasterio.io import MemoryFile
import numpy as np
import os
# Configuration de la page
st.set_page_config(layout="wide", page_title="Extracteur de données géospatiales")
# Définition des couches disponibles
LAYERS = {
"Swisstopo - SWISSIMAGE 10 cm": {"id": "ch.swisstopo.swissimage-dop10", "source": "swisstopo", "type": "image"},
"Swisstopo - Carte nationale 1:25'000": {"id": "ch.swisstopo.pixelkarte-farbe-pk25.noscale", "source": "swisstopo", "type": "image"},
"Swisstopo - MNT": {"id": "ch.swisstopo.swissalti3d", "source": "swisstopo", "type": "raster"},
"ESRI - World Imagery": {"id": "WorldImagery", "source": "esri", "type": "image"},
"ESRI - World Elevation": {"id": "WorldElevation", "source": "esri", "type": "raster"}
}
def draw_box_on_map():
"""Crée une carte interactive pour dessiner une zone d'intérêt."""
m = folium.Map(location=[46.8, 8.2], zoom_start=8)
Draw(draw_options={'polyline': False, 'polygon': False, 'circle': False, 'marker': False, 'circlemarker': False},
edit_options={'edit': False}).add_to(m)
return st_folium(m, width=700, height=500)
def create_geojson_from_box(bbox):
"""Crée un GeoJSON à partir d'une boîte englobante."""
gdf = gpd.GeoDataFrame({'geometry': [bbox]}, crs="EPSG:4326")
return json.loads(gdf.to_json()), json.loads(gdf.to_crs("EPSG:2056").to_json())
@st.cache_data
def extract_swisstopo_data(bbox, layer_id, layer_type):
"""Extrait les données de Swisstopo."""
api_url = f"https://data.geo.admin.ch/api/stac/v0.9/collections/{layer_id}/items"
params = {"bbox": ",".join(map(str, bbox)), "limit": 1}
try:
response = requests.get(api_url, params=params)
response.raise_for_status()
data = response.json()
if data['features']:
feature = data['features'][0]
asset_keys = feature['assets'].keys()
data_key = 'rgb' if 'rgb' in asset_keys else 'data' if 'data' in asset_keys else next(iter(asset_keys))
data_url = feature['assets'][data_key]['href']
return requests.get(data_url).content
except requests.RequestException as e:
st.error(f"Erreur Swisstopo: {str(e)}")
return None
@st.cache_data
def extract_esri_data(bbox, layer_id, layer_type):
"""Extrait les données d'ESRI."""
service_url = "https://services.arcgisonline.com/arcgis/rest/services/World_Imagery/MapServer/export" if layer_type == "image" else "https://elevation.arcgis.com/arcgis/rest/services/WorldElevation/Terrain/ImageServer/exportImage"
params = {
"bbox": f"{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]}",
"bboxSR": 4326,
"size": "1000,1000",
"format": "tiff",
"f": "image",
"imageSR": 4326
}
try:
response = requests.get(service_url, params=params)
response.raise_for_status()
return response.content
except requests.RequestException as e:
st.error(f"Erreur ESRI: {str(e)}")
return None
def process_and_save_data(data, layer_name, layer_type):
"""Traite et sauvegarde les données extraites."""
if data is None:
st.warning(f"Pas de données pour {layer_name}")
return
try:
with MemoryFile(data) as memfile:
with memfile.open() as src:
# Créer un dossier pour sauvegarder les fichiers
output_folder = "extracted_data"
os.makedirs(output_folder, exist_ok=True)
# Nom du fichier de sortie
output_filename = os.path.join(output_folder, f"{layer_name.lower().replace(' ', '_')}.tif")
# Écrire les données dans un nouveau fichier GeoTIFF
profile = src.profile
if layer_type == "image" and src.count == 4: # Pour les images RGBA
profile.update(count=3) # On ne garde que les 3 premières bandes (RGB)
with rasterio.open(output_filename, 'w', **profile) as dst:
if layer_type == "image" and src.count == 4:
dst.write(src.read()[0:3, :, :]) # Écrire seulement les bandes RGB
else:
dst.write(src.read())
st.success(f"Données sauvegardées : {output_filename}")
except Exception as e:
st.error(f"Erreur de traitement pour {layer_name}: {str(e)}")
def main():
st.title("Extracteur de données Swisstopo et ESRI")
# Sélecteur de couches
selected_layers = st.multiselect("Couches à extraire", list(LAYERS.keys()))
# Création de deux colonnes
col1, col2 = st.columns([1, 3])
# Colonne de gauche pour les informations
with col1:
st.markdown("---")
st.markdown("## À propos\nExtracteur de données géospatiales Swisstopo et ESRI.\nDéveloppé par Vertdure")
# Colonne de droite pour la carte et les résultats
with col2:
st.subheader("1. Définir la zone d'intérêt")
method = st.radio("Méthode", ["Dessiner", "GeoJSON"], horizontal=True)
if method == "Dessiner":
map_data = draw_box_on_map()
if map_data['last_active_drawing']:
coords = map_data['last_active_drawing']['geometry']['coordinates'][0]
bbox = box(min(c[0] for c in coords), min(c[1] for c in coords),
max(c[0] for c in coords), max(c[1] for c in coords))
st.session_state['geojson_wgs84'], st.session_state['geojson_swiss'] = create_geojson_from_box(bbox)
st.success("Zone sélectionnée!")
else:
uploaded_file = st.file_uploader("GeoJSON", type="geojson")
if uploaded_file:
gdf = gpd.read_file(uploaded_file)
st.session_state['geojson_wgs84'] = json.loads(gdf.to_crs(4326).to_json())
st.session_state['geojson_swiss'] = json.loads(gdf.to_crs(2056).to_json())
st.success("GeoJSON chargé!")
if st.button("Extraire les données"):
if 'geojson_wgs84' not in st.session_state or 'geojson_swiss' not in st.session_state:
st.error("Définissez d'abord une zone d'intérêt.")
elif not selected_layers:
st.error("Sélectionnez au moins une couche à extraire.")
else:
with st.spinner("Extraction des données en cours..."):
for layer_name in selected_layers:
layer_info = LAYERS[layer_name]
bbox = gpd.GeoDataFrame.from_features(st.session_state['geojson_swiss' if layer_info["source"] == "swisstopo" else 'geojson_wgs84']).total_bounds
st.info(f"Traitement de la couche : {layer_name}")
data = extract_swisstopo_data(bbox, layer_info["id"], layer_info["type"]) if layer_info["source"] == "swisstopo" else extract_esri_data(bbox, layer_info["id"], layer_info["type"])
process_and_save_data(data, layer_name, layer_info["type"])
st.success("Extraction terminée !")
if __name__ == "__main__":
main()