Spaces:
Sleeping
Sleeping
import ibis | |
import ibis.selectors as s | |
from ibis import _ | |
import fiona | |
import geopandas as gpd | |
import rioxarray | |
from shapely.geometry import box | |
con = ibis.duckdb.connect() | |
con.load_extension("spatial") | |
threads = -1 | |
agency_name = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-agency-name.parquet").select(manager_name_id = "Code", manager_name = "Dom") | |
agency_type = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-agency-type.parquet").select(manager_type_id = "Code", manager_type = "Dom") | |
desig_type = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-desgination-type.parquet").select(designation_type_id = "Code", designation_type = "Dom") | |
public_access = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-public-access.parquet").select(public_access_id = "Code", public_access = "Dom") | |
state_name = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-state-name.parquet").select(state = "Code", state_name = "Dom") | |
iucn = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-iucn.parquet").select(iucn_code = "CODE", iucn_category = "DOM") | |
fgb = "https://data.source.coop/cboettig/pad-us-3/pad-us3-combined.fgb" | |
parquet = "https://data.source.coop/cboettig/pad-us-3/pad-us3-combined.parquet" | |
# gdb = "https://data.source.coop/cboettig/pad-us-3/PADUS3/PAD_US3_0.gdb" # original, all tables | |
# or read the fgb version, much slower | |
# pad = con.read_geo(fgb) | |
# pad = con.read_parquet(parquet) | |
# Currently ibis doesn't detect that this is GeoParquet. We need a SQL escape-hatch to cast the geometry | |
con.raw_sql(f"CREATE OR REPLACE VIEW pad AS SELECT *, st_geomfromwkb(geometry) as geom from read_parquet('{parquet}')") | |
pad = con.table("pad") | |
# Get the CRS | |
# fiona is not built with parquet support, must read this from fgb. ideally duckdb's st_read_meta would do this from the parquet | |
meta = fiona.open(fgb) | |
crs = meta.crs | |
## optional getting bounds | |
# extract bounds. (in this case these are already in the same projection actually so r.rio.bounds() would work) | |
r = rioxarray.open_rasterio("https://data.source.coop/cboettig/mobi/species-richness-all/SpeciesRichness_All.tif") | |
bounds = box(*r.rio.transform_bounds(crs)) | |
# + | |
# Now we can do all the usual SQL queries to subset the data. Note the `geom.within()` spatial filter! | |
focal_columns = ["row_n", "FeatClass", "Mang_Name", | |
"Mang_Type", "Des_Tp", "Pub_Access", | |
"GAP_Sts", "IUCN_Cat", "Unit_Nm", | |
"State_Nm", "EsmtHldr", "Date_Est", | |
"SHAPE_Area", "geom"] | |
pad_parquet = ( | |
pad | |
.mutate(row_n=ibis.row_number()) | |
.filter((_.FeatClass.isin(["Easement", "Fee"])) # | ((_.FeatClass == "Proclamation") & (_.Mang_Name == "TRIB")) | |
) | |
# .filter(_.geom.within(bounds)) | |
.select(focal_columns) | |
.rename(geometry="geom") | |
) | |
# Need to revist this to also process the external polygons | |
# .filter(~ _.geom.within(bounds)) | |
pad_parquet.to_parquet("pad-processed.parquet") | |
# + | |
# Add our custom bucket categories: | |
# really could be done seperately. | |
categorical_columns = ["bucket", "FeatClass", "Mang_Name", | |
"Mang_Type", "Des_Tp", "Pub_Access", | |
"GAP_Sts", "IUCN_Cat", "Unit_Nm", | |
"State_Nm", "EsmtHldr", "Date_Est", | |
"row_n"] | |
public = ["DIST", "LOC", "FED", "STAT", "JNT"] | |
case = ( | |
ibis.case() | |
.when( (_.Mang_Type.isin(public) & _.GAP_Sts.isin(["1","2"])), "public conservation") | |
.when( (_.Mang_Type.isin(public) & _.GAP_Sts.isin(["3"])), "mixed use") | |
.when( (_.Mang_Type.isin(public) & _.GAP_Sts.isin(["4"])), "public unprotected") | |
.when( (_.Mang_Type.isin(["PVT", "NGO"]) & (_.GAP_Sts.isin(["1","2", "3"]))), "private conservation") | |
.when( (_.Mang_Type.isin(["PVT", "NGO"]) & (_.GAP_Sts.isin(["4"]))), "private unprotected") | |
.when( (_.Mang_Type == "TRIB"), "tribal") | |
.end() | |
) | |
pad_grouping = ( | |
pad | |
.mutate(row_n=ibis.row_number()) | |
.filter((_.FeatClass.isin(["Easement", "Fee"])) | ( | |
(_.FeatClass == "Proclamation") & (_.Mang_Name == "TRIB")) | |
) | |
.mutate(bucket = case) | |
.select(categorical_columns) | |
.rename(manager_name_id = "Mang_Name", | |
manager_type_id = "Mang_Type", | |
designation_type_id = "Des_Tp", | |
public_access_id = "Pub_Access", | |
category = "FeatClass", | |
iucn_code = "IUCN_Cat", | |
gap_code = "GAP_Sts", | |
state = "State_Nm", | |
easement_holder = "EsmtHldr", | |
date_established = "Date_Est", | |
area_name = "Unit_Nm") | |
.left_join(agency_name, "manager_name_id") | |
.left_join(agency_type, "manager_type_id") | |
.left_join(desig_type, "designation_type_id") | |
.left_join(public_access, "public_access_id") | |
.left_join(state_name, "state") | |
.left_join(iucn, "iucn_code") | |
.select(~s.contains("_right")) | |
) | |
pad_grouping.to_parquet("pad-groupings.parquet") | |
# - | |
(pad_parquet | |
.rename(manager_name_id = "Mang_Name", | |
manager_type_id = "Mang_Type", | |
designation_type_id = "Des_Tp", | |
public_access_id = "Pub_Access", | |
category = "FeatClass", | |
iucn_code = "IUCN_Cat", | |
gap_code = "GAP_Sts", | |
state = "State_Nm", | |
easement_holder = "EsmtHldr", | |
date_established = "Date_Est", | |
area_square_meters = "SHAPE_Area", | |
area_name = "Unit_Nm") | |
.left_join(agency_name, "manager_name_id") | |
.left_join(agency_type, "manager_type_id") | |
.left_join(desig_type, "designation_type_id") | |
.left_join(public_access, "public_access_id") | |
.left_join(state_name, "state") | |
.left_join(iucn, "iucn_code") | |
.select(~s.contains("_right")) | |
# .select(~s.contains("_id")) | |
# if we keep the original geoparquet WKB 'geometry' column, to_pandas() (or execute) gives us only a normal pandas data.frame, and geopandas doesn't see the metadata. | |
# if we replace the geometry with duckdb-native 'geometry' type, to_pandas() gives us a geopanadas! But requires reading into RAM. | |
.to_pandas() | |
.set_crs(crs) | |
.to_parquet("pad-processed.parquet") | |
) | |
# + | |
import rasterio | |
from rasterstats import zonal_stats | |
import geopandas as gpd | |
import pandas as pd | |
from joblib import Parallel, delayed | |
def big_zonal_stats(vec_file, tif_file, stats, col_name, n_jobs, verbose = 10, timeout=10000): | |
# read in vector as geopandas, match CRS to raster | |
with rasterio.open(tif_file) as src: | |
raster_profile = src.profile | |
gdf = gpd.read_parquet(vec_file).to_crs(raster_profile['crs']) | |
# row_n is a global id, may refer to excluded polygons | |
# gdf["row_id"] = gdf.index + 1 | |
# lamba fn to zonal_stats a slice: | |
def get_stats(geom_slice, tif_file, stats): | |
stats = zonal_stats(geom_slice.geometry, tif_file, stats = stats) | |
stats[0]['row_n'] = geom_slice.row_n | |
return stats[0] | |
# iteratation (could be a list comprehension?) | |
jobs = [] | |
for r in gdf.itertuples(): | |
jobs.append(delayed(get_stats)(r, tif_file, stats)) | |
# And here we go | |
output = Parallel(n_jobs=n_jobs, timeout=timeout, verbose=verbose)(jobs) | |
# reshape output | |
df = ( | |
pd.DataFrame(output) | |
.rename(columns={'mean': col_name}) | |
.merge(gdf, how='right', on = 'row_n') | |
) | |
gdf = gpd.GeoDataFrame(df, geometry="geometry") | |
return gdf | |
# - | |
import geopandas as gpd | |
test = gpd.read_parquet("pad-processed.parquet") | |
test.columns | |
# + | |
# %%time | |
tif_file = "/home/rstudio/boettiger-lab/us-pa-policy/hfp_2021_100m_v1-2_cog.tif" | |
vec_file = './pad-processed.parquet' | |
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], | |
col_name = "human_impact", n_jobs=threads, verbose=0) | |
gpd.GeoDataFrame(df, geometry="geometry").to_parquet("pad-stats.parquet") | |
# + | |
# %%time | |
tif_file = '/home/rstudio/source.coop/cboettig/mobi/species-richness-all/SpeciesRichness_All.tif' | |
vec_file = './pad-stats.parquet' | |
big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "richness", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") | |
# + | |
# %%time | |
tif_file = '/home/rstudio/source.coop/cboettig/mobi/range-size-rarity-all/RSR_All.tif' | |
vec_file = './pad-stats.parquet' | |
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], | |
col_name = "rsr", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") | |
# + | |
# %%time | |
tif_file = '/home/rstudio/source.coop/vizzuality/lg-land-carbon-data/deforest_carbon_100m_cog.tif' | |
vec_file = './pad-stats.parquet' | |
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], | |
col_name = "deforest_carbon", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") | |
# + | |
# %%time | |
tif_file = '/home/rstudio/source.coop/vizzuality/lg-land-carbon-data/natcrop_bii_100m_cog.tif' | |
vec_file = './pad-stats.parquet' | |
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], | |
col_name = "biodiversity_intactness_loss", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") | |
# + | |
# %%time | |
tif_file = '/home/rstudio/source.coop/vizzuality/lg-land-carbon-data/natcrop_fii_100m_cog.tif' | |
vec_file = './pad-stats.parquet' | |
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], | |
col_name = "forest_integrity_loss", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") | |
# + | |
# %%time | |
tif_file = '/home/rstudio/source.coop/vizzuality/lg-land-carbon-data/natcrop_expansion_100m_cog.tif' | |
vec_file = './pad-stats.parquet' | |
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "crop_expansion", n_jobs=threads, verbose=0) | |
gpd.GeoDataFrame(df, geometry="geometry").to_parquet("pad-stats.parquet") | |
# + | |
# %%time | |
tif_file = '/home/rstudio/source.coop/vizzuality/lg-land-carbon-data/natcrop_reduction_100m_cog.tif' | |
vec_file = './pad-stats.parquet' | |
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "crop_reduction", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") | |
# + | |
# %%time | |
tif_file = '/home/rstudio/source.coop/cboettig/carbon/cogs/irrecoverable_c_total_2018.tif' | |
vec_file = './pad-stats.parquet' | |
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "irrecoverable_carbon", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") | |
# + | |
# %%time | |
tif_file = '/home/rstudio/source.coop/cboettig/carbon/cogs/manageable_c_total_2018.tif' | |
vec_file = './pad-stats.parquet' | |
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "manageable_carbon", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") | |
# + | |
# %%time | |
tif_file = '/home/rstudio/minio/shared-biodiversity/redlist/cog/combined_rwr_2022.tif' | |
vec_file = './pad-stats.parquet' | |
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "all_species_rwr", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") | |
# + | |
# %%time | |
tif_file = '/home/rstudio/minio/shared-biodiversity/redlist/cog/combined_sr_2022.tif' | |
vec_file = './pad-stats.parquet' | |
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "all_species_richness", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") | |
# + | |
columns = ''' | |
area_name, | |
manager_name, | |
manager_name_id, | |
manager_type, | |
manager_type_id, | |
designation_type, | |
designation_type_id, | |
public_access, | |
category, | |
iucn_code, | |
iucn_category, | |
gap_code, | |
state, | |
state_name, | |
easement_holder, | |
date_established, | |
area_square_meters, | |
geometry, | |
all_species_richness, | |
all_species_rwr, | |
manageable_carbon, | |
irrecoverable_carbon, | |
crop_reduction, | |
crop_expansion, | |
deforest_carbon, | |
richness, | |
rsr, | |
forest_integrity_loss, | |
biodiversity_intactness_loss | |
''' | |
items = columns.split(',') | |
# Remove empty strings and whitespace | |
items = [item.strip() for item in items if item.strip()] | |
items | |
# - | |
import ibis | |
from ibis import _ | |
df = ibis.read_parquet("pad-stats.parquet").select(items) | |
df.group_by(_.manager_type).aggregate(n = _.manager_type.count()).to_pandas() | |
# + | |
## create pad.duckdb | |
from sqlalchemy import create_engine | |
from sqlalchemy import text | |
db_uri = "duckdb:///pad.duckdb" | |
engine = create_engine(db_uri) | |
con = engine.connect() | |
con.execute(f"create or replace table pad as select {columns} from 'pad-stats.parquet'") | |
con.close() | |
# pad_stats = ibis.read_parquet("pad-stats.parquet") | |
# pad_stats.head(20).to_pandas() | |
# - | |
import pandas as pd | |
db_uri = "duckdb:///pad.duckdb" | |
engine = create_engine(db_uri) | |
con = engine.connect() | |
pd.DataFrame(con.execute("select * from pad limit 1").fetchall()) | |