""" coindesk_utils.py – Utilities for saving, merging, and managing CoinDesk data as Parquet using StorageHandler. Features: - save_and_merge_parquet: Save new data, merge with existing Parquet, dedupe by date, keep N days. """ import os import pandas as pd from datetime import datetime, timedelta from src.data_cloud.cloud_utils import StorageHandler def save_and_merge_parquet( storage: StorageHandler, key: str, new_data: pd.DataFrame, date_col: str = "timestamp", days: int = 7, content_type: str = "application/octet-stream", ): """ Save new_data as Parquet, merging with existing file by date_col, keeping only the last N days. - storage: StorageHandler instance - key: storage key (e.g., 'coindesk/spot_markets.parquet') - new_data: DataFrame to save - date_col: column to use for date filtering (must be datetime-like) - days: keep only this many days of data - content_type: MIME type for Parquet """ # Try to load existing data try: existing_bytes = storage.download(key) df_old = pd.read_parquet(pd.io.common.BytesIO(existing_bytes)) except Exception: df_old = pd.DataFrame() # Combine and dedupe df_all = pd.concat([df_old, new_data], ignore_index=True) if date_col in df_all.columns: df_all[date_col] = pd.to_datetime(df_all[date_col], errors="coerce") cutoff = datetime.utcnow() - timedelta(days=days) df_all = df_all[df_all[date_col] >= cutoff] df_all = df_all.sort_values(date_col).drop_duplicates() # Save merged Parquet buf = pd.io.common.BytesIO() df_all.to_parquet(buf, index=False) storage.upload(key, buf.getvalue(), content_type=content_type) return df_all