Maaroufabousaleh
f
c49b21b
"""
main.py – Fetch CoinDesk On-Chain **and** AMM (Uniswap‑style) data
=================================================================
Patched 2025‑07‑13
------------------
* **Fixed** positional/keyword mismatch for `get_block`.
* **Flatten + sanitize** CoinDesk AMM responses so Parquet writes succeed.
* **Direct overwrite** for list/dict‑rich endpoints to prevent merge type errors.
"""
from __future__ import annotations
import sys
import os
import argparse
import logging
import datetime as _dt
import json as _json
from typing import List, Optional, Any, Dict
from dotenv import load_dotenv
import pandas as pd
# ---------------------------------------------------------------------------
# Tier-locked endpoint skip flag
# ---------------------------------------------------------------------------
SKIP_TIER_LOCKED = os.getenv("COINDESK_SKIP_TIER_LOCKED", "true").lower() in ("1", "true", "yes")
# ---------------------------------------------------------------------------
# Path bootstrap – ensure project root is import‑able
# ---------------------------------------------------------------------------
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.abspath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
# ---------------------------------------------------------------------------
# Local imports (resolved after path bootstrap)
# ---------------------------------------------------------------------------
from onchain import OnChainClient, normalize_data # noqa: E402
from src.data_cloud.cloud_utils import StorageHandler # noqa: E402
from src.fetchers.coindesk_client.coindesk_utils import save_and_merge_parquet # noqa: E402
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
CHAIN_ASSET_MAP: Dict[str, int] = {
"ETH": 2,
"BSC": 8,
"BTC": 1,
"BASE": 2410,
"ARB": 808,
}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _flatten_records(resp: Any, id_field: str = "id") -> pd.DataFrame:
"""Flatten dict‑of‑dict → rows DataFrame; else defer to normalize_data()."""
if isinstance(resp, dict) and all(isinstance(v, dict) for v in resp.values()):
return pd.DataFrame([{id_field: k, **v} for k, v in resp.items()])
return normalize_data(resp)
def _sanitize_for_parquet(df: pd.DataFrame) -> pd.DataFrame:
"""Convert any nested dict/list columns to JSON strings for Arrow compatibility."""
for col in df.columns:
if df[col].dtype == "object":
df[col] = df[col].apply(lambda x: _json.dumps(x) if isinstance(x, (dict, list)) else str(x))
return df
def _save_merge(storage: StorageHandler, filename: str, df: pd.DataFrame, *, date_col: str, days: int):
"""Sanitize then merge new df into history via save_and_merge_parquet()."""
if df.empty:
logger.debug("→ %s empty, skip merge", filename)
return
df = _sanitize_for_parquet(df)
save_and_merge_parquet(storage, filename, df, date_col=date_col, days=days)
logger.info("✔ Merged %s (%d rows)", filename, len(df))
def _save_overwrite(storage: StorageHandler, filename: str, df: pd.DataFrame):
"""Sanitize then overwrite local Parquet—bypass merge to avoid mixed types."""
if df.empty:
logger.debug("→ %s empty, skip overwrite", filename)
return
df = _sanitize_for_parquet(df)
local_dir = storage.local_base
path = os.path.join(local_dir, filename)
os.makedirs(os.path.dirname(path), exist_ok=True)
df.to_parquet(path, index=False)
logger.info("✔ Overwrote %s (%d rows)", filename, len(df))
# ---------------------------------------------------------------------------
# On‑chain batch
# ---------------------------------------------------------------------------
def fetch_onchain_all(
onchain: OnChainClient,
storage: StorageHandler,
symbols: List[str],
days_old: int,
block_configs: List[dict],
):
# Address metadata – overwrite to prevent nested-list merges
for sym in symbols:
chain_sym, address = sym.split("-", 1)
chain_id = CHAIN_ASSET_MAP.get(chain_sym)
try:
logger.info("→ Address metadata %s on %s", address, chain_sym)
resp = onchain.get_address_metadata(chain_id, address).get("Data", {})
df = pd.DataFrame([resp])
_save_overwrite(storage, f"{sym}_address_metadata.parquet", df)
except Exception:
logger.exception("✗ Address metadata %s", sym)
# Asset‑by‑address – overwrite for list‑rich fields
try:
logger.info("→ Asset‑by‑address %s on %s", address, chain_sym)
resp = onchain.get_data_by_address(
chain_asset=chain_sym,
address=address,
asset_lookup_priority="SYMBOL",
quote_asset="USD",
).get("Data", {})
df = normalize_data(resp)
_save_overwrite(storage, f"{sym}_data_by_address.parquet", df)
except Exception as e:
if getattr(getattr(e, "response", None), "status_code", None) == 404:
logger.warning("→ Asset‑by‑address unsupported for %s", sym)
else:
logger.exception("✗ Asset‑by‑address %s", sym)
# Historical supply – safe merge
for chain_sym in {s.split("-", 1)[0] for s in symbols}:
# ── Historical supply (premium) ──
if SKIP_TIER_LOCKED:
logger.info("← Skipping historical supply for %s (tier-locked)", chain_sym)
else:
try:
logger.info("→ Supply days %s", chain_sym)
resp = onchain.get_historical_supply_days(
asset=chain_sym,
asset_lookup_priority="SYMBOL",
quote_asset="USD",
).get("Data", {})
df = normalize_data(resp)
_save_merge(storage, f"{chain_sym}_historical_supply_days.parquet", df, date_col="timestamp", days=days_old)
except Exception as e:
if getattr(getattr(e, "response", None), "status_code", None) == 401:
logger.warning("→ Supply tier-locked for %s", chain_sym)
else:
logger.exception("✗ Supply days %s", chain_sym)
# Summary by chain – overwrite nested struct
for chain_sym in {s.split("-", 1)[0] for s in symbols}:
try:
logger.info("→ Chain summary %s", chain_sym)
resp = onchain.get_summary_by_chain(
chain_asset=chain_sym,
asset_lookup_priority="SYMBOL",
).get("Data", {})
df = pd.DataFrame([resp])
_save_overwrite(storage, f"{chain_sym}_chain_summary.parquet", df)
except Exception:
logger.exception("✗ Chain summary %s", chain_sym)
# Block data – safe merge
for cfg in block_configs:
ca, bn, groups = cfg["chain_asset"], cfg["block_number"], cfg["groups"]
try:
logger.info("→ Block %s:%s", ca, bn)
resp = onchain.get_block(ca, bn, groups=groups).get("Data", {})
df = pd.DataFrame([resp])
_save_merge(storage, f"block_{ca}_{bn}.parquet", df, date_col="timestamp", days=days_old)
except Exception:
logger.exception("✗ Block %s:%s", ca, bn)
# ---------------------------------------------------------------------------
# AMM batch
# ---------------------------------------------------------------------------
def fetch_amm_all(
onchain: OnChainClient,
storage: StorageHandler,
*,
market: str,
instruments: List[str],
days_old: int,
pairs: Optional[List[str]] = None,
):
logger.info("=== AMM %s – %s ===", market, ", ".join(instruments))
# Latest tick – safe merge
try:
tick = onchain.get_latest_swap_tick(market=market, instruments=instruments).get("Data", {})
df = _flatten_records(tick, "instrument")
_save_merge(storage, f"{market}_latest_swap_tick.parquet", df, date_col="timestamp", days=days_old)
except Exception:
logger.exception("✗ Latest tick %s", market)
# Historical OHLCV – safe merge
for inst in instruments:
try:
hist = onchain.get_historical_swap_days(
market=market,
instrument=inst,
limit=30,
aggregate=1,
fill=True,
).get("Data", {})
df = normalize_data(hist)
_save_merge(storage, f"{inst}_historical_swap_days.parquet", df, date_col="timestamp", days=days_old)
except Exception:
logger.exception("✗ OHLCV %s", inst)
# Hourly messages – safe merge with warning
hour_ts = int(_dt.datetime.utcnow().replace(minute=0, second=0, microsecond=0).timestamp())
for inst in instruments:
# ── Swap messages (premium) ──
if SKIP_TIER_LOCKED:
logger.info("← Skipping swap-messages for %s (tier-locked)", inst)
else:
try:
swaps = onchain.get_swap_messages_hour(market=market, instrument=inst, hour_ts=hour_ts).get("Data", {})
df = normalize_data(swaps)
_save_merge(storage, f"{inst}_swap_messages_{hour_ts}.parquet", df, date_col="timestamp", days=days_old)
except Exception as e:
if getattr(getattr(e, "response", None), "status_code", None) == 401:
logger.warning("→ swap-messages tier-locked for %s", inst)
else:
logger.exception("✗ swap messages %s", inst)
try:
liq = onchain.get_liquidity_update_messages_hour(market=market, instrument=inst, hour_ts=hour_ts).get("Data", {})
df = normalize_data(liq)
_save_merge(storage, f"{inst}_liquidity_updates_{hour_ts}.parquet", df, date_col="timestamp", days=days_old)
except Exception as e:
if SKIP_TIER_LOCKED:
logger.info("← Skipping liquidity-updates for %s (tier-locked)", inst)
elif getattr(getattr(e, "response", None), "status_code", None) == 401:
logger.warning("→ liquidity-updates tier-locked for %s", inst)
else:
logger.exception("✗ liquidity updates %s", inst)
# Instrument metadata – safe merge
try:
meta = onchain.get_latest_instrument_metadata(market=market, instruments=instruments).get("Data", {})
df = _flatten_records(meta, "instrument")
_save_merge(storage, f"{market}_instrument_metadata.parquet", df, date_col="timestamp", days=days_old)
except Exception:
logger.exception("✗ Instrument metadata %s", market)
# Market overview – safe merge
try:
mkts = onchain.get_amm_markets(market=market).get("Data", {})
df = _flatten_records(mkts, "market")
_save_merge(storage, f"{market}_markets.parquet", df, date_col="timestamp", days=days_old)
except Exception:
logger.exception("✗ Markets %s", market)
# Optional pairs listing – safe merge
if pairs:
try:
lst = onchain.get_amm_markets_instruments(market=market, instruments=pairs).get("Data", {})
df = _flatten_records(lst, "pair")
_save_merge(storage, f"{market}_markets_instruments.parquet", df, date_col="timestamp", days=days_old)
except Exception:
logger.exception("✗ Markets+instruments %s", market)
# ---------------------------------------------------------------------------
# Orchestrator & CLI
# ---------------------------------------------------------------------------
def fetch_all(config: Dict[str, Any] | None = None):
load_dotenv()
cfg = config or {}
# Fix: check both 'api_key' and 'api-key' (CLI uses --api-key), fallback to env
api_key = (
cfg.get("api_key")
or cfg.get("api-key")
or os.getenv("COINDESK_API_KEY")
)
print("Using API key:", api_key)
host = cfg.get("host") or os.getenv("COINDESK_API_HOST", "data-api.coindesk.com")
base_url = f"https://{host}/"
days_old = int(cfg.get("days") or os.getenv("COINDESK_DAYS_OLD", 7))
symbols_arg = cfg.get("symbols") or os.getenv("COINDESK_SYMBOLS", "ETH-0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2")
symbols = [s.strip() for s in symbols_arg.split(",") if s.strip()]
amm_market = cfg.get("amm_market") or os.getenv("COINDESK_AMM_MARKET", "uniswapv2")
amm_instruments_arg = cfg.get("amm_instruments") or os.getenv("COINDESK_AMM_INSTRUMENTS", "0x0d4a11d5eeaac28ec3f61d100daf4d40471f1852_2,0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc_2")
amm_instruments = [s.strip() for s in amm_instruments_arg.split(",") if s.strip()]
amm_pairs_arg = cfg.get("amm_pairs") or os.getenv("COINDESK_AMM_PAIRS", "WETH-USDC,WETH-USDT")
amm_pairs = [p.strip() for p in amm_pairs_arg.split(",") if p.strip()]
block_configs = [
{"chain_asset": 2, "block_number": 19501436, "groups": ["ID", "METADATA", "TRANSACTIONS"]},
{"chain_asset": 8, "block_number": 33459930, "groups": ["ID", "METADATA", "TRANSACTIONS"]},
{"chain_asset": 1, "block_number": 840946, "groups": ["ID", "METADATA", "TRANSACTIONS"]},
{"chain_asset": 2410, "block_number": 17014740, "groups": ["ID", "METADATA", "TRANSACTIONS"]},
{"chain_asset": 808, "block_number": 284999999,"groups": ["ID", "METADATA", "TRANSACTIONS"]},
]
onchain = OnChainClient(api_key=api_key, base_url=base_url)
storage = StorageHandler(
endpoint_url=None,
access_key=None,
secret_key=None,
bucket_name=None,
local_base="data/coindesk/onchain",
)
# ------------------------------------------------------------------
# Execute batches
# ------------------------------------------------------------------
logger.info("=== Fetching on-chain data ===")
fetch_onchain_all(onchain, storage, symbols, days_old, block_configs)
logger.info("=== Fetching AMM (%s) data ===", amm_market)
fetch_amm_all(
onchain,
storage,
market=amm_market,
instruments=amm_instruments,
days_old=days_old,
pairs=amm_pairs,
)
# ---------------------------------------------------------------------------
# CLI wrapper
# ---------------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Fetch CoinDesk On-Chain & AMM data")
parser.add_argument("--symbols", help="comma-separated chain-symbol addresses (e.g. 'ETH-0x...,BTC-...')")
parser.add_argument("--days", type=int, help="merge window in days (default 7)")
parser.add_argument("--api-key", help="CoinDesk API key")
parser.add_argument("--host", help="API host override")
# AMM extras ------------------------------------------------------
parser.add_argument("--amm-market", help="AMM market (e.g. 'uniswapv2')")
parser.add_argument("--amm-instruments", help="comma-separated instrument addresses")
parser.add_argument("--amm-pairs", help="comma-separated token pairs for markets+instruments")
args = parser.parse_args()
cfg = {k: v for k, v in vars(args).items() if v is not None}
# Fallbacks to env handled inside fetch_all
fetch_all(cfg)