Maaroufabousaleh
f
c49b21b
"""
onchain.py – CoinDesk Data API On-Chain & AMM endpoints.
This client wraps the publicly-documented CoinDesk **/onchain/** routes, now including
Automated Market Maker (AMM) queries for Uniswap-style DEXs.
Provided functionality
----------------------
* Processed block data (multi-chain)
* Address metadata & asset summaries
* Historical supply-day metrics
* **NEW – AMM endpoints**
Β· Latest swap tick (price/volume snapshot)
Β· Historical OHLCV+ for swaps (daily aggregation)
Β· Raw swap messages (per-hour granularity)
Β· Liquidity-update messages (per-hour granularity)
Β· Instrument metadata
Β· Market & instrument discovery
All helper methods return the raw `requests.Response` JSON. You can pass the output
through `normalize_data()` to obtain a tidy *pandas* `DataFrame`.
Example
~~~~~~~
>>> client = OnChainClient(api_key="YOUR_COIN_DESK_KEY")
>>> df = normalize_data(
... client.get_latest_swap_tick(
... market="uniswapv2",
... instruments=[
... "0x0d4a11d5eeaac28ec3f61d100daf4d40471f1852_2", # WETH/USDT
... "0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc_2", # WETH/USDC
... ]
... )
... )
>>> df.head()
Dependencies
------------
* `pandas` (tabular manipulation)
* `requests` (via BaseClient)
"""
from __future__ import annotations
from typing import Any, List, Optional, Dict
import pandas as pd
from client import BaseClient # ← must expose ._get()
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def normalize_data(raw: Any) -> pd.DataFrame:
"""Best-effort conversion of API *raw* JSON into a :class:`pandas.DataFrame`. Handles
the three typical response shapes returned by CoinDesk:
* **list[dict]** – directly convertible via :pyclass:`pandas.DataFrame`
* **dict[str, list]** where all lists are equal length – idem
* **dict[str, Any]** heterogeneous – wrapped in a single-row DataFrame
"""
if isinstance(raw, list):
return pd.DataFrame(raw)
if isinstance(raw, dict):
try:
return pd.DataFrame(raw)
except ValueError: # unequal length sequences β†’ single row
return pd.DataFrame([raw])
# Fallback – unknown shape
return pd.DataFrame()
# ---------------------------------------------------------------------------
# Main client
# ---------------------------------------------------------------------------
class OnChainClient(BaseClient):
"""Typed thin wrapper around the CoinDesk On-Chain REST API."""
# ---------------------------------------------------------------------
# Core (already present) ------------------------------------------------
# ---------------------------------------------------------------------
def get_block(self, chain_asset: int, block_number: int, *, groups: List[str]):
"""Processed block data for *chain_asset* at *block_number*.
``groups`` is a list such as ``["ID", "METADATA", "TRANSACTIONS"]``.
Maps to ``/onchain/v1/block/{chain_asset}``.
"""
return self._get(
f"onchain/v1/block/{chain_asset}",
params={"block_number": block_number, "groups": ",".join(groups)},
)
def get_address_metadata(self, chain_asset: int, address: str):
"""Rich metadata for an *address* on *chain_asset*."""
return self._get(
f"onchain/v1/address/metadata/{chain_asset}", params={"address": address}
)
def get_summary_by_chain(self, chain_asset: str, *, asset_lookup_priority: str = "SYMBOL"):
"""Summary view of assets for a blockchain network."""
return self._get(
"onchain/v3/summary/by/chain",
params={
"chain_asset": chain_asset,
"asset_lookup_priority": asset_lookup_priority,
},
)
def get_data_by_address(
self,
chain_asset: str,
address: str,
*,
asset_lookup_priority: str = "SYMBOL",
quote_asset: str = "USD",
):
"""Look-up asset data (balance, value, etc.) by *address*."""
return self._get(
"onchain/v2/data/by/address",
params={
"chain_asset": chain_asset,
"address": address,
"asset_lookup_priority": asset_lookup_priority,
"quote_asset": quote_asset,
},
)
def get_historical_supply_days(
self,
asset: str,
*,
asset_lookup_priority: str = "SYMBOL",
quote_asset: Optional[str] = None,
):
"""Daily historical supply for *asset* – available for major networks."""
params: Dict[str, str] = {
"asset": asset,
"asset_lookup_priority": asset_lookup_priority,
}
if quote_asset:
params["quote_asset"] = quote_asset
return self._get("onchain/v2/historical/supply/days", params=params)
# ---------------------------------------------------------------------
# AMM (new) ------------------------------------------------------------
# ---------------------------------------------------------------------
# Helpers – convert booleans to lower-case strings required by API
_bool = staticmethod(lambda x: str(bool(x)).lower())
def get_latest_swap_tick(
self,
*,
market: str,
instruments: List[str],
instrument_lookup_strategy: str = "ALL_OPTIONS",
apply_mapping: bool = True,
):
"""Latest tick (price, volume, liquidity) for one or many *instruments* on an AMM
*market* (e.g. ``"uniswapv2"``).
**Endpoint** ``/onchain/v1/amm/latest/swap/tick``
"""
return self._get(
"onchain/v1/amm/latest/swap/tick",
params={
"market": market,
"instruments": ",".join(instruments),
"instrument_lookup_strategy": instrument_lookup_strategy,
"apply_mapping": self._bool(apply_mapping),
},
)
def get_historical_swap_days(
self,
*,
market: str,
instrument: str,
limit: int = 30,
aggregate: int = 1,
fill: bool = True,
instrument_lookup_strategy: str = "ALL_OPTIONS",
apply_mapping: bool = True,
):
"""Daily OHLCV+ history for a swap *instrument* (e.g. LP address _tokenId).
**Endpoint** ``/onchain/v1/amm/historical/swap/days``
"""
return self._get(
"onchain/v1/amm/historical/swap/days",
params={
"market": market,
"instrument": instrument,
"limit": limit,
"aggregate": aggregate,
"fill": self._bool(fill),
"instrument_lookup_strategy": instrument_lookup_strategy,
"apply_mapping": self._bool(apply_mapping),
},
)
def get_swap_messages_hour(
self,
*,
market: str,
instrument: str,
hour_ts: int,
instrument_lookup_strategy: str = "ALL_OPTIONS",
apply_mapping: bool = True,
):
"""Raw swap messages (mints/burns/swaps) for a given *hour_ts* (UNIX seconds).
**Endpoint** ``/onchain/v2/amm/historical/swap-messages/hour``
"""
return self._get(
"onchain/v2/amm/historical/swap-messages/hour",
params={
"market": market,
"instrument": instrument,
"hour_ts": hour_ts,
"instrument_lookup_strategy": instrument_lookup_strategy,
"apply_mapping": self._bool(apply_mapping),
},
)
def get_liquidity_update_messages_hour(
self,
*,
market: str,
instrument: str,
hour_ts: int,
instrument_lookup_strategy: str = "ALL_OPTIONS",
apply_mapping: bool = True,
):
"""Liquidity add/remove messages for the specified *hour_ts*.
**Endpoint** ``/onchain/v2/amm/historical/liquidity-update-messages/hour``
"""
return self._get(
"onchain/v2/amm/historical/liquidity-update-messages/hour",
params={
"market": market,
"instrument": instrument,
"hour_ts": hour_ts,
"instrument_lookup_strategy": instrument_lookup_strategy,
"apply_mapping": self._bool(apply_mapping),
},
)
def get_latest_instrument_metadata(
self,
*,
market: str,
instruments: List[str],
instrument_lookup_strategy: str = "ALL_OPTIONS",
apply_mapping: bool = True,
):
"""Token-pair metadata (decimals, symbols, etc.) for *instruments*.
**Endpoint** ``/onchain/v1/amm/latest/instrument/metadata``
"""
return self._get(
"onchain/v1/amm/latest/instrument/metadata",
params={
"market": market,
"instruments": ",".join(instruments),
"instrument_lookup_strategy": instrument_lookup_strategy,
"apply_mapping": self._bool(apply_mapping),
},
)
# ------------------------------------------------------------------
# Market discovery
# ------------------------------------------------------------------
def get_amm_markets(self, *, market: str):
"""List details about an AMM *market* (e.g. pools count, TVL)."""
return self._get("onchain/v1/amm/markets", params={"market": market})
def get_amm_markets_instruments(
self,
*,
market: str,
instruments: List[str],
instrument_status: str = "ACTIVE",
instrument_lookup_strategy: str = "ALL_OPTIONS",
):
"""Enumerate instruments on an AMM *market* filtered by *instrument_status*.
**Endpoint** ``/onchain/v1/amm/markets/instruments``
"""
return self._get(
"onchain/v1/amm/markets/instruments",
params={
"market": market,
"instruments": ",".join(instruments),
"instrument_status": instrument_status,
"instrument_lookup_strategy": instrument_lookup_strategy,
},
)