|
""" |
|
Optimized CoinDesk API Client with Smart Market Discovery and Endpoint Compatibility |
|
Enhanced version with improved error handling and market validation |
|
""" |
|
|
|
import argparse |
|
import json |
|
import os |
|
from client import BaseClient, APIError |
|
from typing import Union, List, Optional, Dict, Tuple |
|
from datetime import datetime, timedelta |
|
import time |
|
from dataclasses import dataclass |
|
from enum import Enum |
|
import sys |
|
|
|
def safe_print(*args, **kwargs): |
|
"""Prints unicode safely even if the terminal encoding is not UTF-8.""" |
|
text = " ".join(str(arg) for arg in args) |
|
try: |
|
sys.stdout.buffer.write((text + '\n').encode('utf-8', errors='replace')) |
|
except Exception: |
|
|
|
print(text.encode('ascii', errors='replace').decode('ascii'), **kwargs) |
|
|
|
import config |
|
|
|
import pathlib |
|
|
|
def ensure_data_dir(): |
|
"""Ensure the data directory exists.""" |
|
data_dir = pathlib.Path("dta/coindesk/indexes") |
|
data_dir.mkdir(parents=True, exist_ok=True) |
|
return data_dir |
|
|
|
def save_json_result(filename: str, data: dict): |
|
"""Save data as JSON to the dta/coindesk/indexes directory.""" |
|
data_dir = ensure_data_dir() |
|
file_path = data_dir / filename |
|
with open(file_path, "w", encoding="utf-8") as f: |
|
json.dump(data, f, ensure_ascii=False, indent=2) |
|
|
|
class EndpointStatus(Enum): |
|
SUPPORTED = "supported" |
|
UNSUPPORTED = "unsupported" |
|
UNKNOWN = "unknown" |
|
|
|
@dataclass |
|
class MarketInfo: |
|
"""Market information with endpoint compatibility""" |
|
market_id: str |
|
name: str |
|
endpoints: Dict[str, EndpointStatus] |
|
instruments: List[str] |
|
last_checked: datetime |
|
|
|
class IndexClient(BaseClient): |
|
""" |
|
Enhanced Index & Reference Rates endpoints for CoinDesk Data API. |
|
Includes smart market discovery and endpoint compatibility checking. |
|
""" |
|
|
|
def __init__(self): |
|
super().__init__() |
|
self._market_cache = {} |
|
self._endpoint_compatibility = {} |
|
|
|
def list_markets(self) -> dict: |
|
"""List all available markets (index families).""" |
|
return self.get("/index/cc/v1/markets") |
|
|
|
def list_markets_instruments(self, |
|
market: str, |
|
instruments: Optional[List[str]] = None, |
|
instrument_status: str = "ACTIVE" |
|
) -> dict: |
|
""" |
|
List instruments for a given market. If instruments is None, |
|
retrieves *all* mapped instruments from the API. |
|
""" |
|
params = { |
|
"market": market, |
|
"instrument_status": instrument_status |
|
} |
|
if instruments: |
|
params["instruments"] = ",".join(instruments) |
|
return self.get("/index/cc/v1/markets/instruments", params=params) |
|
|
|
def get_latest_tick(self, market: str, instruments: List[str], |
|
apply_mapping: bool = True) -> dict: |
|
""" |
|
Latest OHLCV+ tick data. |
|
|
|
Args: |
|
market: Index family identifier (e.g., 'sda', 'cdifti') |
|
instruments: List of instrument tickers (e.g., ['XBX-USD', 'ETX-USD']) |
|
apply_mapping: Whether to apply instrument mapping |
|
""" |
|
if not instruments: |
|
raise ValueError("The 'instruments' parameter is required") |
|
|
|
params = { |
|
'market': market, |
|
'instruments': ','.join(instruments), |
|
'apply_mapping': str(apply_mapping).lower() |
|
} |
|
|
|
return self.get("/index/cc/v1/latest/tick", params=params) |
|
|
|
def get_historical_days(self, market: str, instrument: str, limit: int = 30, |
|
aggregate: int = 1, fill: bool = True, |
|
apply_mapping: bool = True, response_format: str = "JSON") -> dict: |
|
""" |
|
Historical OHLCV+ by day. |
|
|
|
Args: |
|
market: Index family identifier (e.g., 'sda', 'cdifti') |
|
instrument: Single instrument ticker (e.g., 'XBX-USD') |
|
limit: Number of days to retrieve |
|
aggregate: Aggregation period |
|
fill: Whether to fill missing data |
|
apply_mapping: Whether to apply instrument mapping |
|
response_format: Response format |
|
""" |
|
if not instrument: |
|
raise ValueError("The 'instrument' parameter is required") |
|
|
|
params = { |
|
'market': market, |
|
'instrument': instrument, |
|
'limit': limit, |
|
'aggregate': aggregate, |
|
'fill': str(fill).lower(), |
|
'apply_mapping': str(apply_mapping).lower(), |
|
'response_format': response_format |
|
} |
|
|
|
return self.get("/index/cc/v1/historical/days", params=params) |
|
|
|
def get_latest_instrument_metadata(self, market: str, instruments: List[str], |
|
apply_mapping: bool = True) -> dict: |
|
""" |
|
Latest instrument metadata. |
|
|
|
Args: |
|
market: Index family identifier (e.g., 'sda', 'cdifti') |
|
instruments: List of instrument tickers (e.g., ['XBX-USD', 'ETX-USD']) |
|
apply_mapping: Whether to apply instrument mapping |
|
""" |
|
if not instruments: |
|
raise ValueError("The 'instruments' parameter is required") |
|
|
|
params = { |
|
'market': market, |
|
'instruments': ','.join(instruments), |
|
'apply_mapping': str(apply_mapping).lower() |
|
} |
|
|
|
return self.get("/index/cc/v1/latest/instrument/metadata", params=params) |
|
|
|
def list_eod_markets_instruments(self, market: str, instruments: List[str] = None, |
|
instrument_status: str = "ACTIVE") -> dict: |
|
""" |
|
List EOD (unmapped) instruments - most reliable for instrument discovery. |
|
|
|
Args: |
|
market: Index family identifier (e.g., 'cdifti') |
|
instruments: Optional list of instruments to filter |
|
instrument_status: Status filter (default: 'ACTIVE') |
|
""" |
|
params = { |
|
'market': market, |
|
'instrument_status': instrument_status |
|
} |
|
if instruments: |
|
params['instruments'] = ','.join(instruments) |
|
|
|
return self.get("/index/cc/v1/markets/instruments/unmapped/eod", params=params) |
|
|
|
def get_historical_days_eod(self, market: str, instrument: str, limit: int = 5, |
|
response_format: str = "JSON") -> dict: |
|
""" |
|
EOD historical OHLCV+ by day. |
|
|
|
Args: |
|
market: Index family identifier (e.g., 'cdifti') |
|
instrument: Single instrument ticker |
|
limit: Number of days to retrieve |
|
response_format: Response format |
|
""" |
|
params = { |
|
'market': market, |
|
'instrument': instrument, |
|
'limit': limit, |
|
'response_format': response_format |
|
} |
|
|
|
return self.get("/index/cc/v1/historical/days/eod", params=params) |
|
|
|
def check_endpoint_compatibility(self, market: str) -> Dict[str, EndpointStatus]: |
|
""" |
|
Check which endpoints are supported for a specific market. |
|
|
|
Args: |
|
market: Market identifier to check |
|
|
|
Returns: |
|
Dictionary mapping endpoint names to their support status |
|
""" |
|
if market in self._endpoint_compatibility: |
|
return self._endpoint_compatibility[market] |
|
|
|
endpoints = {} |
|
test_instruments = ["BTC-USD", "ETH-USD", "XBX-USD"] |
|
|
|
|
|
try: |
|
self.list_eod_markets_instruments(market=market) |
|
endpoints["eod_instruments"] = EndpointStatus.SUPPORTED |
|
except APIError as e: |
|
endpoints["eod_instruments"] = EndpointStatus.UNSUPPORTED if e.status_code == 400 else EndpointStatus.UNKNOWN |
|
except Exception: |
|
endpoints["eod_instruments"] = EndpointStatus.UNKNOWN |
|
|
|
|
|
try: |
|
|
|
instruments = self.discover_instruments_for_market(market, silent=True) |
|
if instruments: |
|
self.list_markets_instruments(market=market, instruments=instruments[:2]) |
|
endpoints["mapped_instruments"] = EndpointStatus.SUPPORTED |
|
else: |
|
endpoints["mapped_instruments"] = EndpointStatus.UNKNOWN |
|
except APIError as e: |
|
endpoints["mapped_instruments"] = EndpointStatus.UNSUPPORTED if e.status_code == 400 else EndpointStatus.UNKNOWN |
|
except Exception: |
|
endpoints["mapped_instruments"] = EndpointStatus.UNKNOWN |
|
|
|
|
|
try: |
|
instruments = self.discover_instruments_for_market(market, silent=True) |
|
if instruments: |
|
self.get_latest_tick(market=market, instruments=instruments[:2]) |
|
endpoints["tick_data"] = EndpointStatus.SUPPORTED |
|
else: |
|
endpoints["tick_data"] = EndpointStatus.UNKNOWN |
|
except APIError as e: |
|
endpoints["tick_data"] = EndpointStatus.UNSUPPORTED if e.status_code in [400, 404] else EndpointStatus.UNKNOWN |
|
except Exception: |
|
endpoints["tick_data"] = EndpointStatus.UNKNOWN |
|
|
|
|
|
try: |
|
instruments = self.discover_instruments_for_market(market, silent=True) |
|
if instruments: |
|
self.get_historical_days(market=market, instrument=instruments[0], limit=1) |
|
endpoints["historical_data"] = EndpointStatus.SUPPORTED |
|
else: |
|
endpoints["historical_data"] = EndpointStatus.UNKNOWN |
|
except APIError as e: |
|
endpoints["historical_data"] = EndpointStatus.UNSUPPORTED if e.status_code in [400, 404] else EndpointStatus.UNKNOWN |
|
except Exception: |
|
endpoints["historical_data"] = EndpointStatus.UNKNOWN |
|
|
|
|
|
try: |
|
instruments = self.discover_instruments_for_market(market, silent=True) |
|
if instruments: |
|
self.get_latest_instrument_metadata(market=market, instruments=instruments[:2]) |
|
endpoints["metadata"] = EndpointStatus.SUPPORTED |
|
else: |
|
endpoints["metadata"] = EndpointStatus.UNKNOWN |
|
except APIError as e: |
|
endpoints["metadata"] = EndpointStatus.UNSUPPORTED if e.status_code in [400, 404] else EndpointStatus.UNKNOWN |
|
except Exception: |
|
endpoints["metadata"] = EndpointStatus.UNKNOWN |
|
|
|
self._endpoint_compatibility[market] = endpoints |
|
return endpoints |
|
|
|
def discover_markets_with_compatibility(self) -> List[MarketInfo]: |
|
""" |
|
Discover all markets with their endpoint compatibility and instruments. |
|
|
|
Returns: |
|
List of MarketInfo objects with full compatibility information |
|
""" |
|
safe_print("π Discovering markets with endpoint compatibility...") |
|
|
|
try: |
|
resp = self.list_markets() |
|
raw_markets = resp.get('Data', []) |
|
|
|
if not raw_markets: |
|
safe_print("β No markets found in API response") |
|
return [] |
|
|
|
market_infos = [] |
|
|
|
for entry in raw_markets: |
|
if isinstance(entry, dict): |
|
market_id = entry.get('market') |
|
market_name = entry.get('name') |
|
else: |
|
market_id = entry |
|
market_name = None |
|
|
|
if not market_id: |
|
continue |
|
|
|
safe_print(f"\nπ Analyzing market: {market_id} ({market_name or 'Unknown'})") |
|
|
|
|
|
endpoints = self.check_endpoint_compatibility(market_id) |
|
|
|
|
|
instruments = self.discover_instruments_for_market(market_id) |
|
|
|
|
|
market_info = MarketInfo( |
|
market_id=market_id, |
|
name=market_name or market_id, |
|
endpoints=endpoints, |
|
instruments=instruments, |
|
last_checked=datetime.now() |
|
) |
|
|
|
market_infos.append(market_info) |
|
|
|
|
|
supported_count = sum(1 for status in endpoints.values() if status == EndpointStatus.SUPPORTED) |
|
total_count = len(endpoints) |
|
safe_print(f" β
Supported endpoints: {supported_count}/{total_count}") |
|
safe_print(f" π§ Available instruments: {len(instruments)}") |
|
|
|
return market_infos |
|
|
|
except Exception as e: |
|
safe_print(f"β Error discovering markets: {e}") |
|
return [] |
|
|
|
def discover_instruments_for_market(self, market: str, silent: bool = False) -> List[str]: |
|
""" |
|
Discover available instruments for a specific market using multiple approaches. |
|
|
|
Args: |
|
market: Market identifier (e.g., 'sda', 'cdifti') |
|
silent: If True, suppress output messages |
|
|
|
Returns: |
|
List of available instrument tickers |
|
""" |
|
if not silent: |
|
safe_print(f"π Discovering instruments for market '{market}'...") |
|
|
|
|
|
try: |
|
eod = self.list_eod_markets_instruments(market=market) |
|
data = eod.get("Data", {}).get(market, {}).get("instruments", {}) |
|
if data: |
|
instruments = list(data.keys()) |
|
if not silent: |
|
safe_print(f" β
{len(instruments)} via EOD") |
|
return instruments |
|
except Exception as e: |
|
if not silent: |
|
safe_print(f" β οΈ EOD failed: {e}") |
|
|
|
|
|
common = ["BTC-USD", "ETH-USD", "XBX-USD", "ETX-USD"] |
|
try: |
|
meta = self.get_latest_instrument_metadata(market, common) |
|
if meta.get("Data"): |
|
instruments = list(meta["Data"].keys()) |
|
if not silent: |
|
safe_print(f" β
{len(instruments)} via metadata") |
|
return instruments |
|
except Exception as e: |
|
if not silent: |
|
safe_print(f" β οΈ Metadata failed: {e}") |
|
|
|
|
|
try: |
|
mapped = self.list_markets_instruments(market=market) |
|
data = mapped.get("Data", {}).get(market, {}) |
|
if data: |
|
instruments = list(data.keys()) |
|
if not silent: |
|
safe_print(f" β
{len(instruments)} via general mapped endpoint") |
|
return instruments |
|
except Exception as e: |
|
if not silent: |
|
safe_print(f" β οΈ General mapped failed: {e}") |
|
|
|
if not silent: |
|
safe_print(f" β No instruments for {market}") |
|
return [] |
|
|
|
def get_market_summary(self, market: str) -> Dict: |
|
""" |
|
Get a comprehensive summary of a market's capabilities. |
|
|
|
Args: |
|
market: Market identifier |
|
|
|
Returns: |
|
Dictionary with market summary information |
|
""" |
|
endpoints = self.check_endpoint_compatibility(market) |
|
instruments = self.discover_instruments_for_market(market, silent=True) |
|
|
|
supported_endpoints = [name for name, status in endpoints.items() if status == EndpointStatus.SUPPORTED] |
|
|
|
return { |
|
"market_id": market, |
|
"total_instruments": len(instruments), |
|
"sample_instruments": instruments[:5], |
|
"supported_endpoints": supported_endpoints, |
|
"endpoint_details": endpoints, |
|
"is_functional": len(supported_endpoints) > 0 and len(instruments) > 0 |
|
} |
|
|
|
|
|
def test_market_comprehensively(client: IndexClient, market: str): |
|
""" |
|
Run comprehensive tests on a market with smart endpoint selection. |
|
|
|
Args: |
|
client: IndexClient instance |
|
market: Market identifier to test |
|
""" |
|
safe_print(f"\n{'='*60}") |
|
safe_print(f"π§ͺ COMPREHENSIVE MARKET TEST: {market}") |
|
safe_print(f"{'='*60}") |
|
|
|
|
|
summary = client.get_market_summary(market) |
|
|
|
safe_print(f"π Market Summary:") |
|
safe_print(f" Market ID: {summary['market_id']}") |
|
safe_print(f" Total Instruments: {summary['total_instruments']}") |
|
safe_print(f" Functional: {'β
' if summary['is_functional'] else 'β'}") |
|
safe_print(f" Supported Endpoints: {', '.join(summary['supported_endpoints'])}") |
|
|
|
if not summary['is_functional']: |
|
safe_print("β οΈ Market is not functional - skipping detailed tests") |
|
return |
|
|
|
instruments = summary['sample_instruments'][:3] |
|
safe_print(f"π§ Testing with instruments: {instruments}") |
|
|
|
|
|
endpoint_tests = { |
|
"eod_instruments": lambda: client.list_eod_markets_instruments(market=market), |
|
"mapped_instruments": lambda: client.list_markets_instruments(market=market, instruments=instruments), |
|
"tick_data": lambda: client.get_latest_tick(market=market, instruments=instruments), |
|
"historical_data": lambda: client.get_historical_days(market=market, instrument=instruments[0], limit=3), |
|
"metadata": lambda: client.get_latest_instrument_metadata(market=market, instruments=instruments) |
|
} |
|
|
|
results = {} |
|
|
|
for endpoint_name, test_func in endpoint_tests.items(): |
|
if endpoint_name in summary['supported_endpoints']: |
|
safe_print(f"\nπ§ͺ Testing {endpoint_name}...") |
|
try: |
|
response = test_func() |
|
data_count = len(response.get('Data', [])) |
|
results[endpoint_name] = "β
SUCCESS" |
|
safe_print(f" β
SUCCESS - Retrieved {data_count} data points") |
|
safe_print(f" π Response keys: {list(response.keys())}") |
|
except Exception as e: |
|
results[endpoint_name] = f"β FAILED: {str(e)[:100]}" |
|
safe_print(f" β FAILED: {str(e)[:100]}") |
|
else: |
|
results[endpoint_name] = "βοΈ SKIPPED (unsupported)" |
|
safe_print(f"\nβοΈ Skipping {endpoint_name} (unsupported)") |
|
|
|
|
|
safe_print(f"\nπ Test Results Summary:") |
|
for endpoint, result in results.items(): |
|
safe_print(f" {endpoint}: {result}") |
|
|
|
safe_print(f"\n{'='*60}") |
|
|
|
|
|
|
|
def fetch_all_functional_markets(): |
|
""" |
|
Fetch latest tick and 30-day history for BTC-USD, SOL-USD, ETH-USD |
|
across all functional markets. |
|
Save results in dta/coindesk/indexes. |
|
""" |
|
import config |
|
from client import APIError |
|
|
|
if not config.API_KEY: |
|
safe_print("β Error: COINDESK_API_KEY not set.") |
|
return |
|
|
|
client = IndexClient() |
|
safe_print("π Fetching data for all functional markets and BTC/SOL/ETH...") |
|
|
|
markets = [ |
|
"cadli", "ccix", "ccxrp", "ccxrpperp", |
|
"cd_mc", "cdi_b", "cdi_mda", "cdor", "sda" |
|
] |
|
instruments = ["BTC-USD", "SOL-USD", "ETH-USD"] |
|
|
|
for m in markets: |
|
safe_print(f"\nπ Market: {m}") |
|
market_results = {} |
|
for inst in instruments: |
|
|
|
try: |
|
tick = client.get_latest_tick(market=m, instruments=[inst]) |
|
data = tick.get("Data", {}).get(inst, {}) |
|
safe_print(f" πΈ {inst} latest price: {data.get('price', 'n/a')}") |
|
market_results[f"{inst}_latest_tick"] = tick |
|
except APIError as e: |
|
safe_print(f" β οΈ {inst} tick failed (status {e.status_code})") |
|
market_results[f"{inst}_latest_tick"] = {"error": f"APIError {e.status_code}"} |
|
except Exception as e: |
|
safe_print(f" β οΈ {inst} tick error: {e}") |
|
market_results[f"{inst}_latest_tick"] = {"error": str(e)} |
|
|
|
|
|
try: |
|
hist = client.get_historical_days( |
|
market=m, |
|
instrument=inst, |
|
limit=30, |
|
aggregate=1, |
|
fill=True |
|
) |
|
days = hist.get("Data", {}).get("values", []) |
|
safe_print(f" β’ {len(days)} days of history (first: {days[0] if days else 'n/a'})") |
|
market_results[f"{inst}_history"] = hist |
|
except APIError as e: |
|
safe_print(f" β οΈ {inst} history failed (status {e.status_code})") |
|
market_results[f"{inst}_history"] = {"error": f"APIError {e.status_code}"} |
|
except Exception as e: |
|
safe_print(f" β οΈ {inst} history error: {e}") |
|
market_results[f"{inst}_history"] = {"error": str(e)} |
|
|
|
|
|
save_json_result(f"{m}.json", market_results) |
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
fetch_all_functional_markets() |