""" main.py – Fetch CoinDesk On-Chain **and** AMM (Uniswap‑style) data ================================================================= Patched 2025‑07‑13 ------------------ * **Fixed** positional/keyword mismatch for `get_block`. * **Flatten + sanitize** CoinDesk AMM responses so Parquet writes succeed. * **Direct overwrite** for list/dict‑rich endpoints to prevent merge type errors. """ from __future__ import annotations import sys import os import argparse import logging import datetime as _dt import json as _json from typing import List, Optional, Any, Dict from dotenv import load_dotenv import pandas as pd # --------------------------------------------------------------------------- # Tier-locked endpoint skip flag # --------------------------------------------------------------------------- SKIP_TIER_LOCKED = os.getenv("COINDESK_SKIP_TIER_LOCKED", "true").lower() in ("1", "true", "yes") # --------------------------------------------------------------------------- # Path bootstrap – ensure project root is import‑able # --------------------------------------------------------------------------- SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) PROJECT_ROOT = os.path.abspath(os.path.join(SCRIPT_DIR, "..", "..", "..")) if PROJECT_ROOT not in sys.path: sys.path.insert(0, PROJECT_ROOT) # --------------------------------------------------------------------------- # Local imports (resolved after path bootstrap) # --------------------------------------------------------------------------- from onchain import OnChainClient, normalize_data # noqa: E402 from src.data_cloud.cloud_utils import StorageHandler # noqa: E402 from src.fetchers.coindesk_client.coindesk_utils import save_and_merge_parquet # noqa: E402 # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- CHAIN_ASSET_MAP: Dict[str, int] = { "ETH": 2, "BSC": 8, "BTC": 1, "BASE": 2410, "ARB": 808, } # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _flatten_records(resp: Any, id_field: str = "id") -> pd.DataFrame: """Flatten dict‑of‑dict → rows DataFrame; else defer to normalize_data().""" if isinstance(resp, dict) and all(isinstance(v, dict) for v in resp.values()): return pd.DataFrame([{id_field: k, **v} for k, v in resp.items()]) return normalize_data(resp) def _sanitize_for_parquet(df: pd.DataFrame) -> pd.DataFrame: """Convert any nested dict/list columns to JSON strings for Arrow compatibility.""" for col in df.columns: if df[col].dtype == "object": df[col] = df[col].apply(lambda x: _json.dumps(x) if isinstance(x, (dict, list)) else str(x)) return df def _save_merge(storage: StorageHandler, filename: str, df: pd.DataFrame, *, date_col: str, days: int): """Sanitize then merge new df into history via save_and_merge_parquet().""" if df.empty: logger.debug("→ %s empty, skip merge", filename) return df = _sanitize_for_parquet(df) save_and_merge_parquet(storage, filename, df, date_col=date_col, days=days) logger.info("✔ Merged %s (%d rows)", filename, len(df)) def _save_overwrite(storage: StorageHandler, filename: str, df: pd.DataFrame): """Sanitize then overwrite local Parquet—bypass merge to avoid mixed types.""" if df.empty: logger.debug("→ %s empty, skip overwrite", filename) return df = _sanitize_for_parquet(df) local_dir = storage.local_base path = os.path.join(local_dir, filename) os.makedirs(os.path.dirname(path), exist_ok=True) df.to_parquet(path, index=False) logger.info("✔ Overwrote %s (%d rows)", filename, len(df)) # --------------------------------------------------------------------------- # On‑chain batch # --------------------------------------------------------------------------- def fetch_onchain_all( onchain: OnChainClient, storage: StorageHandler, symbols: List[str], days_old: int, block_configs: List[dict], ): # Address metadata – overwrite to prevent nested-list merges for sym in symbols: chain_sym, address = sym.split("-", 1) chain_id = CHAIN_ASSET_MAP.get(chain_sym) try: logger.info("→ Address metadata %s on %s", address, chain_sym) resp = onchain.get_address_metadata(chain_id, address).get("Data", {}) df = pd.DataFrame([resp]) _save_overwrite(storage, f"{sym}_address_metadata.parquet", df) except Exception: logger.exception("✗ Address metadata %s", sym) # Asset‑by‑address – overwrite for list‑rich fields try: logger.info("→ Asset‑by‑address %s on %s", address, chain_sym) resp = onchain.get_data_by_address( chain_asset=chain_sym, address=address, asset_lookup_priority="SYMBOL", quote_asset="USD", ).get("Data", {}) df = normalize_data(resp) _save_overwrite(storage, f"{sym}_data_by_address.parquet", df) except Exception as e: if getattr(getattr(e, "response", None), "status_code", None) == 404: logger.warning("→ Asset‑by‑address unsupported for %s", sym) else: logger.exception("✗ Asset‑by‑address %s", sym) # Historical supply – safe merge for chain_sym in {s.split("-", 1)[0] for s in symbols}: # ── Historical supply (premium) ── if SKIP_TIER_LOCKED: logger.info("← Skipping historical supply for %s (tier-locked)", chain_sym) else: try: logger.info("→ Supply days %s", chain_sym) resp = onchain.get_historical_supply_days( asset=chain_sym, asset_lookup_priority="SYMBOL", quote_asset="USD", ).get("Data", {}) df = normalize_data(resp) _save_merge(storage, f"{chain_sym}_historical_supply_days.parquet", df, date_col="timestamp", days=days_old) except Exception as e: if getattr(getattr(e, "response", None), "status_code", None) == 401: logger.warning("→ Supply tier-locked for %s", chain_sym) else: logger.exception("✗ Supply days %s", chain_sym) # Summary by chain – overwrite nested struct for chain_sym in {s.split("-", 1)[0] for s in symbols}: try: logger.info("→ Chain summary %s", chain_sym) resp = onchain.get_summary_by_chain( chain_asset=chain_sym, asset_lookup_priority="SYMBOL", ).get("Data", {}) df = pd.DataFrame([resp]) _save_overwrite(storage, f"{chain_sym}_chain_summary.parquet", df) except Exception: logger.exception("✗ Chain summary %s", chain_sym) # Block data – safe merge for cfg in block_configs: ca, bn, groups = cfg["chain_asset"], cfg["block_number"], cfg["groups"] try: logger.info("→ Block %s:%s", ca, bn) resp = onchain.get_block(ca, bn, groups=groups).get("Data", {}) df = pd.DataFrame([resp]) _save_merge(storage, f"block_{ca}_{bn}.parquet", df, date_col="timestamp", days=days_old) except Exception: logger.exception("✗ Block %s:%s", ca, bn) # --------------------------------------------------------------------------- # AMM batch # --------------------------------------------------------------------------- def fetch_amm_all( onchain: OnChainClient, storage: StorageHandler, *, market: str, instruments: List[str], days_old: int, pairs: Optional[List[str]] = None, ): logger.info("=== AMM %s – %s ===", market, ", ".join(instruments)) # Latest tick – safe merge try: tick = onchain.get_latest_swap_tick(market=market, instruments=instruments).get("Data", {}) df = _flatten_records(tick, "instrument") _save_merge(storage, f"{market}_latest_swap_tick.parquet", df, date_col="timestamp", days=days_old) except Exception: logger.exception("✗ Latest tick %s", market) # Historical OHLCV – safe merge for inst in instruments: try: hist = onchain.get_historical_swap_days( market=market, instrument=inst, limit=30, aggregate=1, fill=True, ).get("Data", {}) df = normalize_data(hist) _save_merge(storage, f"{inst}_historical_swap_days.parquet", df, date_col="timestamp", days=days_old) except Exception: logger.exception("✗ OHLCV %s", inst) # Hourly messages – safe merge with warning hour_ts = int(_dt.datetime.utcnow().replace(minute=0, second=0, microsecond=0).timestamp()) for inst in instruments: # ── Swap messages (premium) ── if SKIP_TIER_LOCKED: logger.info("← Skipping swap-messages for %s (tier-locked)", inst) else: try: swaps = onchain.get_swap_messages_hour(market=market, instrument=inst, hour_ts=hour_ts).get("Data", {}) df = normalize_data(swaps) _save_merge(storage, f"{inst}_swap_messages_{hour_ts}.parquet", df, date_col="timestamp", days=days_old) except Exception as e: if getattr(getattr(e, "response", None), "status_code", None) == 401: logger.warning("→ swap-messages tier-locked for %s", inst) else: logger.exception("✗ swap messages %s", inst) try: liq = onchain.get_liquidity_update_messages_hour(market=market, instrument=inst, hour_ts=hour_ts).get("Data", {}) df = normalize_data(liq) _save_merge(storage, f"{inst}_liquidity_updates_{hour_ts}.parquet", df, date_col="timestamp", days=days_old) except Exception as e: if SKIP_TIER_LOCKED: logger.info("← Skipping liquidity-updates for %s (tier-locked)", inst) elif getattr(getattr(e, "response", None), "status_code", None) == 401: logger.warning("→ liquidity-updates tier-locked for %s", inst) else: logger.exception("✗ liquidity updates %s", inst) # Instrument metadata – safe merge try: meta = onchain.get_latest_instrument_metadata(market=market, instruments=instruments).get("Data", {}) df = _flatten_records(meta, "instrument") _save_merge(storage, f"{market}_instrument_metadata.parquet", df, date_col="timestamp", days=days_old) except Exception: logger.exception("✗ Instrument metadata %s", market) # Market overview – safe merge try: mkts = onchain.get_amm_markets(market=market).get("Data", {}) df = _flatten_records(mkts, "market") _save_merge(storage, f"{market}_markets.parquet", df, date_col="timestamp", days=days_old) except Exception: logger.exception("✗ Markets %s", market) # Optional pairs listing – safe merge if pairs: try: lst = onchain.get_amm_markets_instruments(market=market, instruments=pairs).get("Data", {}) df = _flatten_records(lst, "pair") _save_merge(storage, f"{market}_markets_instruments.parquet", df, date_col="timestamp", days=days_old) except Exception: logger.exception("✗ Markets+instruments %s", market) # --------------------------------------------------------------------------- # Orchestrator & CLI # --------------------------------------------------------------------------- def fetch_all(config: Dict[str, Any] | None = None): load_dotenv() cfg = config or {} # Fix: check both 'api_key' and 'api-key' (CLI uses --api-key), fallback to env api_key = ( cfg.get("api_key") or cfg.get("api-key") or os.getenv("COINDESK_API_KEY") ) print("Using API key:", api_key) host = cfg.get("host") or os.getenv("COINDESK_API_HOST", "data-api.coindesk.com") base_url = f"https://{host}/" days_old = int(cfg.get("days") or os.getenv("COINDESK_DAYS_OLD", 7)) symbols_arg = cfg.get("symbols") or os.getenv("COINDESK_SYMBOLS", "ETH-0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2") symbols = [s.strip() for s in symbols_arg.split(",") if s.strip()] amm_market = cfg.get("amm_market") or os.getenv("COINDESK_AMM_MARKET", "uniswapv2") amm_instruments_arg = cfg.get("amm_instruments") or os.getenv("COINDESK_AMM_INSTRUMENTS", "0x0d4a11d5eeaac28ec3f61d100daf4d40471f1852_2,0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc_2") amm_instruments = [s.strip() for s in amm_instruments_arg.split(",") if s.strip()] amm_pairs_arg = cfg.get("amm_pairs") or os.getenv("COINDESK_AMM_PAIRS", "WETH-USDC,WETH-USDT") amm_pairs = [p.strip() for p in amm_pairs_arg.split(",") if p.strip()] block_configs = [ {"chain_asset": 2, "block_number": 19501436, "groups": ["ID", "METADATA", "TRANSACTIONS"]}, {"chain_asset": 8, "block_number": 33459930, "groups": ["ID", "METADATA", "TRANSACTIONS"]}, {"chain_asset": 1, "block_number": 840946, "groups": ["ID", "METADATA", "TRANSACTIONS"]}, {"chain_asset": 2410, "block_number": 17014740, "groups": ["ID", "METADATA", "TRANSACTIONS"]}, {"chain_asset": 808, "block_number": 284999999,"groups": ["ID", "METADATA", "TRANSACTIONS"]}, ] onchain = OnChainClient(api_key=api_key, base_url=base_url) storage = StorageHandler( endpoint_url=None, access_key=None, secret_key=None, bucket_name=None, local_base="data/coindesk/onchain", ) # ------------------------------------------------------------------ # Execute batches # ------------------------------------------------------------------ logger.info("=== Fetching on-chain data ===") fetch_onchain_all(onchain, storage, symbols, days_old, block_configs) logger.info("=== Fetching AMM (%s) data ===", amm_market) fetch_amm_all( onchain, storage, market=amm_market, instruments=amm_instruments, days_old=days_old, pairs=amm_pairs, ) # --------------------------------------------------------------------------- # CLI wrapper # --------------------------------------------------------------------------- if __name__ == "__main__": parser = argparse.ArgumentParser(description="Fetch CoinDesk On-Chain & AMM data") parser.add_argument("--symbols", help="comma-separated chain-symbol addresses (e.g. 'ETH-0x...,BTC-...')") parser.add_argument("--days", type=int, help="merge window in days (default 7)") parser.add_argument("--api-key", help="CoinDesk API key") parser.add_argument("--host", help="API host override") # AMM extras ------------------------------------------------------ parser.add_argument("--amm-market", help="AMM market (e.g. 'uniswapv2')") parser.add_argument("--amm-instruments", help="comma-separated instrument addresses") parser.add_argument("--amm-pairs", help="comma-separated token pairs for markets+instruments") args = parser.parse_args() cfg = {k: v for k, v in vars(args).items() if v is not None} # Fallbacks to env handled inside fetch_all fetch_all(cfg)