Maaroufabousaleh
f
c49b21b
"""
Comprehensive Santiment Data Fetcher
====================================
This module provides a complete data fetcher for the Santiment API using the sanpy library.
It maximizes data retrieval by organizing metrics into categories and providing batch operations.
Features:
- Fetches all available metrics organized by category
- Supports batch operations for efficient API usage
- Handles rate limiting and error management
- Provides data export capabilities
- Supports both single asset and multi-asset queries
- Includes SQL query execution for custom data needs
Author: AI Assistant
Version: 1.0.0
"""
import san
import pandas as pd
import numpy as np
import time
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Union, Any
import json
import os
from dataclasses import dataclass, field
from concurrent.futures import ThreadPoolExecutor, as_completed
# Load environment variables
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # dotenv not available, continue without it
import warnings
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Resolve data directory base
try:
from src.config import DATA_DIR as CFG_DATA_DIR
except Exception:
try:
from config import DATA_DIR as CFG_DATA_DIR
except Exception:
CFG_DATA_DIR = "/data"
from pathlib import Path
def _resolve_under_data(path_like: str | os.PathLike) -> str:
p = Path(path_like)
if p.is_absolute():
return str(p)
parts = p.parts
if parts and parts[0].lower() == "data":
rel = Path(*parts[1:]) if len(parts) > 1 else Path()
else:
rel = p
return str(Path(CFG_DATA_DIR) / rel)
@dataclass
class FetchConfig:
"""Configuration class for data fetching parameters - OPTIMIZED FOR API CONSERVATION"""
from_date: str = "2024-01-01" # Reduced from 2020 to save API calls
to_date: str = "utc_now"
interval: str = "1d"
include_incomplete_data: bool = False
batch_size: int = 25 # Reduced from 50 to save API calls
max_workers: int = 5 # Reduced from 10 to save API calls
rate_limit_delay: int = 60
export_format: str = "parquet" # csv, json, parquet
export_directory: str = "data/santiment"
class SantimentDataFetcher:
"""
Comprehensive Santiment Data Fetcher
This class provides methods to fetch maximum possible data from Santiment API
using the sanpy library with efficient batch operations and error handling.
"""
def __init__(self, api_key: Optional[str] = None, config: Optional[FetchConfig] = None):
"""
Initialize the Santiment Data Fetcher
Args:
api_key: Santiment API key(s) for accessing restricted data (comma-separated for multiple keys)
config: FetchConfig object with fetching parameters
"""
self.config = config or FetchConfig()
self._normalize_dates()
# Set up multiple API keys
self._setup_api_keys(api_key)
# Resolve export directory under DATA_DIR, create and clean up existing files
self.config.export_directory = _resolve_under_data(self.config.export_directory)
os.makedirs(self.config.export_directory, exist_ok=True)
self._cleanup_existing_files()
# Initialize data storage
self.fetched_data: Dict[str, pd.DataFrame] = {}
self.failed_queries: List[Dict] = []
# Define comprehensive metric categories
self.metric_categories = self._define_metric_categories()
# Get available metrics and projects
self._initialize_metadata()
# Initialize symbol normalization
self.symbol_normalizer = self._setup_symbol_normalizer()
def _setup_symbol_normalizer(self):
"""
Set up symbol normalization mapping for consistent asset identification
Returns:
Dictionary mapping various symbol formats to canonical slugs
"""
# Canonical mapping for major crypto assets
# Maps various symbols/names to the official Santiment slug
symbol_mapping = {
# Bitcoin variants
'bitcoin': 'bitcoin',
'btc': 'bitcoin',
'Bitcoin': 'bitcoin',
'BTC': 'bitcoin',
# Ethereum variants
'ethereum': 'ethereum',
'eth': 'ethereum',
'Ethereum': 'ethereum',
'ETH': 'ethereum',
# Ripple/XRP variants
'ripple': 'ripple',
'xrp': 'ripple',
'Ripple': 'ripple',
'XRP': 'ripple',
# Solana variants
'solana': 'solana',
'sol': 'solana',
'Solana': 'solana',
'SOL': 'solana',
# Cardano variants
'cardano': 'cardano',
'ada': 'cardano',
'Cardano': 'cardano',
'ADA': 'cardano',
# Polkadot variants
'polkadot': 'polkadot',
'dot': 'polkadot',
'Polkadot': 'polkadot',
'DOT': 'polkadot',
# Chainlink variants
'chainlink': 'chainlink',
'link': 'chainlink',
'Chainlink': 'chainlink',
'LINK': 'chainlink',
# Litecoin variants
'litecoin': 'litecoin',
'ltc': 'litecoin',
'Litecoin': 'litecoin',
'LTC': 'litecoin',
# Bitcoin Cash variants
'bitcoin-cash': 'bitcoin-cash',
'bch': 'bitcoin-cash',
'Bitcoin Cash': 'bitcoin-cash',
'BCH': 'bitcoin-cash',
# Stellar variants
'stellar': 'stellar',
'xlm': 'stellar',
'Stellar': 'stellar',
'XLM': 'stellar',
# Ethereum Classic variants
'ethereum-classic': 'ethereum-classic',
'etc': 'ethereum-classic',
'Ethereum Classic': 'ethereum-classic',
'ETC': 'ethereum-classic',
# EOS variants
'eos': 'eos',
'EOS': 'eos',
}
logger.info(f"Initialized symbol normalizer with {len(symbol_mapping)} mappings")
return symbol_mapping
def normalize_symbol(self, symbol: str) -> str:
"""
Normalize a symbol to its canonical Santiment slug
Args:
symbol: Symbol to normalize
Returns:
Canonical slug
"""
if symbol in self.symbol_normalizer:
canonical = self.symbol_normalizer[symbol]
if symbol != canonical:
logger.debug(f"Normalized '{symbol}' -> '{canonical}'")
return canonical
# If not found in mapping, return as-is but log warning
logger.warning(f"Unknown symbol '{symbol}' not found in normalization mapping")
return symbol.lower()
def get_symbol_alternatives(self, symbol: str) -> List[str]:
"""
Get all alternative symbols for a given symbol (both directions)
Args:
symbol: Symbol to find alternatives for
Returns:
List of alternative symbols including the original
"""
alternatives = [symbol]
# Create reverse mapping to find alternatives
reverse_mapping = {}
for variant, canonical in self.symbol_normalizer.items():
if canonical not in reverse_mapping:
reverse_mapping[canonical] = []
reverse_mapping[canonical].append(variant)
# If symbol is a canonical, get all its variants
if symbol in reverse_mapping:
alternatives.extend(reverse_mapping[symbol])
# If symbol is a variant, get the canonical and other variants
canonical = self.normalize_symbol(symbol)
if canonical in reverse_mapping:
alternatives.extend(reverse_mapping[canonical])
# Remove duplicates and return
return list(set(alternatives))
def fetch_single_metric_with_alternatives(self, metric: str, slug: str, **kwargs) -> Optional[pd.DataFrame]:
"""
Fetch a single metric for a single asset, trying alternative symbols if the primary fails
Args:
metric: The metric name
slug: The asset slug (will try alternatives if this fails)
**kwargs: Additional parameters for the API call
Returns:
DataFrame with the metric data or None if failed
"""
# Get all alternative symbols to try
alternatives = self.get_symbol_alternatives(slug)
logger.debug(f"Trying alternatives for {slug}: {alternatives}")
# Try each alternative in order (start with the normalized canonical form)
canonical = self.normalize_symbol(slug)
if canonical != slug:
alternatives = [canonical] + [alt for alt in alternatives if alt != canonical]
for i, alt_slug in enumerate(alternatives):
try:
data = self.fetch_single_metric(metric, alt_slug, **kwargs)
if data is not None and not data.empty:
if i > 0 or alt_slug != slug: # Successfully fetched with alternative
logger.info(f"[ALT_SUCCESS] {metric} for {slug} succeeded using alternative '{alt_slug}'")
# Update slug column to reflect the original requested slug for consistency
data['slug'] = slug
data['alternative_slug_used'] = alt_slug
return data
except Exception as e:
error_msg = str(e)
# Check if this is a metric-level error that won't be fixed by trying other slugs
if any(skip_phrase in error_msg.lower() for skip_phrase in [
'not supported for',
'not implemented for',
'outside the allowed interval',
'upgrade to a higher tier'
]):
logger.warning(f"[METRIC_SKIP] {metric} has fundamental issues, skipping all alternatives: {error_msg}")
break # Don't try other alternatives for this metric
# If it's just a slug issue, continue trying alternatives
if 'is not an existing slug' in error_msg.lower():
logger.debug(f"Alternative {alt_slug} failed for {metric}: {e}")
continue
else:
logger.debug(f"Alternative {alt_slug} failed for {metric}: {e}")
continue
logger.warning(f"[ALT_FAILED] All alternatives failed for {metric} with slug {slug}")
return None
def normalize_slug_list(self, slugs: List[str]) -> List[str]:
"""
Normalize a list of slugs and remove duplicates
Args:
slugs: List of slugs to normalize
Returns:
List of normalized, deduplicated slugs
"""
normalized = []
seen = set()
for slug in slugs:
canonical = self.normalize_symbol(slug)
if canonical not in seen:
normalized.append(canonical)
seen.add(canonical)
else:
logger.debug(f"Removed duplicate slug: {slug} (canonical: {canonical})")
logger.info(f"Normalized {len(slugs)} slugs to {len(normalized)} unique canonical slugs")
return normalized
def _normalize_dates(self):
"""
Convert relative date strings in self.config.from_date / to_date
into absolute YYYY-MM-DD dates that Sanpy can parse.
Supports:
- "ND" (e.g. "30d") → today minus N days
- "utc_now" → today
"""
now = datetime.utcnow()
# from_date: e.g. "30d"
fd = self.config.from_date.strip().lower()
if fd.endswith('d') and fd[:-1].isdigit():
days = int(fd[:-1])
from_dt = now - timedelta(days=days)
# Sanpy expects "YYYY-MM-DD"
self.config.from_date = from_dt.strftime('%Y-%m-%d')
# to_date: sometimes set to "utc_now"
td = self.config.to_date.strip().lower()
if td == 'utc_now':
self.config.to_date = now.strftime('%Y-%m-%d')
def _setup_api_keys(self, api_key: Optional[str] = None):
"""
Set up multiple API keys for rate limit handling
Args:
api_key: API key(s) - can be comma-separated for multiple keys
"""
# Parse API keys from parameter or environment
api_key_string = api_key or os.getenv('SANTIMENT_API_KEY')
if api_key_string:
# Support comma-separated API keys
self.api_keys = [key.strip() for key in api_key_string.split(',') if key.strip()]
logger.info(f"Santiment fetcher initialized with {len(self.api_keys)} API key(s)")
# Check if all keys are from the same account
if len(self.api_keys) > 1:
logger.info("Multiple API keys detected. Testing key diversity...")
self._validate_api_key_diversity()
else:
self.api_keys = []
logger.warning("No API key provided - limited to free tier data")
# Initialize API key management
self.current_key_index = 0
self.rate_limit_switches = 0
# Set initial API key
if self.api_keys:
self._set_current_api_key()
def _validate_api_key_diversity(self):
"""
Validate that API keys are from different accounts for effective rate limit handling
"""
try:
user_ids = set()
functional_keys = 0
rate_limited_keys = 0
for i, key in enumerate(self.api_keys[:3]): # Test only first 3 to avoid exhausting quota
# Temporarily set this key
san.ApiConfig.api_key = key
try:
# Make a simple query to get user info
result = san.execute_sql(query="SELECT 1", set_index=None)
# If successful, key is functional but we can't determine user ID without error
functional_keys += 1
logger.info(f"API Key #{i+1}: {key[:8]}... appears functional")
except Exception as e:
error_str = str(e)
if 'user with id' in error_str:
# Extract user ID from error message
import re
match = re.search(r'user with id (\d+)', error_str)
if match:
user_id = match.group(1)
user_ids.add(user_id)
rate_limited_keys += 1
logger.info(f"API Key #{i+1}: {key[:8]}... belongs to user ID {user_id} (rate limited)")
else:
logger.debug(f"API Key #{i+1}: {key[:8]}... - {error_str}")
# Reset to first key
self.current_key_index = 0
self._set_current_api_key()
# Analyze results
if rate_limited_keys > 0 and len(user_ids) == 1:
if functional_keys > 0:
logger.warning("⚠️ WARNING: Cannot determine if all API keys are from different accounts!")
logger.warning(f"⚠️ {rate_limited_keys} key(s) belong to user ID {list(user_ids)[0]}, {functional_keys} key(s) appear functional")
logger.warning("⚠️ If functional keys are from the same account, rate limit switching won't work.")
logger.warning("⚠️ For guaranteed effective rate limiting, use API keys from different Santiment accounts.")
logger.warning("⚠️ Create additional accounts at https://app.santiment.net/")
else:
logger.warning("⚠️ WARNING: All tested API keys belong to the same Santiment account!")
logger.warning("⚠️ Rate limits are applied per account, not per key.")
logger.warning("⚠️ API key switching will not be effective with same-account keys.")
logger.warning("⚠️ Create additional accounts at https://app.santiment.net/")
elif len(user_ids) > 1:
logger.info(f"✅ Good! API keys are from {len(user_ids)} different accounts.")
logger.info("✅ This will provide effective rate limit distribution.")
elif functional_keys == len(self.api_keys):
logger.info("✅ All API keys appear functional.")
logger.info("ℹ️ Cannot determine account diversity without rate limit errors.")
logger.info("ℹ️ Monitor rate limit switches during operation to verify effectiveness.")
except Exception as e:
logger.debug(f"Could not validate API key diversity: {e}")
logger.info("API key diversity validation skipped - continuing with provided keys")
def _set_current_api_key(self):
"""Set the current API key in san.ApiConfig"""
if self.api_keys:
current_key = self.api_keys[self.current_key_index]
san.ApiConfig.api_key = current_key
logger.info(f"Using API key #{self.current_key_index + 1}: {current_key[:8]}...")
else:
san.ApiConfig.api_key = None
def _switch_api_key(self):
"""Switch to the next available API key"""
if len(self.api_keys) <= 1:
logger.warning("Only one or no API keys available, cannot switch")
return False
old_index = self.current_key_index
self.current_key_index = (self.current_key_index + 1) % len(self.api_keys)
self.rate_limit_switches += 1
logger.info(f"[SWITCH] Switching from API key #{old_index + 1} to #{self.current_key_index + 1} (switch #{self.rate_limit_switches})")
# Warn if switching too frequently (indicates same account issue)
if self.rate_limit_switches > len(self.api_keys) * 2:
logger.warning("⚠️ High number of API key switches detected!")
logger.warning("⚠️ This suggests all keys may be from the same account.")
logger.warning("⚠️ Consider using API keys from different Santiment accounts.")
# Set new API key
self._set_current_api_key()
# Add a delay after switching keys
time.sleep(2.0)
return True
def _is_rate_limit_error(self, error_message):
"""Check if the error indicates a rate limit issue"""
rate_limit_indicators = [
"429",
"rate limit",
"too many requests",
"api limit",
"quota exceeded",
"limit exceeded",
"rate_limit_exception",
"API Rate Limit Reached",
"rate limit reached"
]
error_str = str(error_message).lower()
return any(indicator in error_str for indicator in rate_limit_indicators)
def _cleanup_existing_files(self):
"""
Clean up all existing files in the export directory before starting a new fetch.
This prevents accumulation of old data files from previous runs.
"""
import glob
import shutil
if not os.path.exists(self.config.export_directory):
return
try:
# Get all files in the export directory
all_files = glob.glob(os.path.join(self.config.export_directory, "*"))
if all_files:
logger.info(f"Cleaning up {len(all_files)} existing files in {self.config.export_directory}")
for file_path in all_files:
try:
if os.path.isfile(file_path):
os.remove(file_path)
logger.debug(f"Removed file: {os.path.basename(file_path)}")
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
logger.debug(f"Removed directory: {os.path.basename(file_path)}")
except Exception as e:
logger.warning(f"Failed to remove {file_path}: {e}")
logger.info(f"Successfully cleaned up export directory: {self.config.export_directory}")
else:
logger.info(f"Export directory is already clean: {self.config.export_directory}")
except Exception as e:
logger.error(f"Failed to cleanup export directory {self.config.export_directory}: {e}")
# Don't raise the exception - just log it and continue
def _define_metric_categories(self) -> Dict[str, List[str]]:
"""Define REDUCED categories of Santiment metrics for API conservation."""
return {
# Essential Financial Metrics Only
'financial': [
'price_usd', 'marketcap_usd', 'volume_usd'
# Reduced from 12 to 3 most important metrics
],
# Core Network Activity
'network_activity': [
'daily_active_addresses', 'new_addresses'
# Reduced from 9 to 2 most important metrics
],
# Basic Transaction Metrics
'transactions': [
'transaction_count', 'transaction_volume_usd'
# Reduced from 8 to 2 most important metrics
],
# Essential Exchange Metrics
'exchange': [
'exchange_inflow', 'exchange_outflow'
# Reduced from 8 to 2 most important metrics
]
# Removed: supply, development, social, derivatives, whales
# This reduces API calls by ~70% while keeping core metrics
}
def _initialize_metadata(self):
"""Initialize metadata about available metrics and projects"""
try:
logger.info("Fetching available metrics...")
self.available_metrics = san.available_metrics()
logger.info(f"Found {len(self.available_metrics)} available metrics")
logger.info("Fetching available projects...")
self.projects_df = san.get("projects/all")
self.available_slugs = self.projects_df['slug'].tolist()
logger.info(f"Found {len(self.available_slugs)} available projects")
except Exception as e:
logger.error(f"Failed to initialize metadata: {e}")
self.available_metrics = []
self.available_slugs = []
def get_metric_metadata(self, metric: str) -> Dict[str, Any]:
"""
Get metadata for a specific metric
Args:
metric: The metric name
Returns:
Dictionary containing metric metadata
"""
try:
metadata = san.metadata(
metric,
arr=["availableSlugs", "defaultAggregation", "humanReadableName",
"isAccessible", "isRestricted", "restrictedFrom", "restrictedTo"]
)
return metadata
except Exception as e:
logger.warning(f"Failed to get metadata for {metric}: {e}")
return {}
def fetch_single_metric(self, metric: str, slug: str, **kwargs) -> Optional[pd.DataFrame]:
"""
Fetch a single metric for a single asset
Args:
metric: The metric name
slug: The asset slug
**kwargs: Additional parameters for the API call
Returns:
DataFrame with the metric data or None if failed
"""
max_retries = len(self.api_keys) if self.api_keys else 1
keys_tried = set()
for attempt in range(max_retries):
try:
# If we've tried all keys, reset and wait
if len(keys_tried) >= len(self.api_keys) and self.api_keys:
logger.warning(f"All {len(self.api_keys)} API keys exhausted for {metric}, waiting 30 seconds...")
time.sleep(30)
keys_tried.clear()
self.current_key_index = 0
self._set_current_api_key()
params = {
'slug': slug,
'from_date': kwargs.get('from_date', self.config.from_date),
'to_date': kwargs.get('to_date', self.config.to_date),
'interval': kwargs.get('interval', self.config.interval),
'include_incomplete_data': kwargs.get('include_incomplete_data', self.config.include_incomplete_data)
}
# Add any additional selector parameters
if 'selector' in kwargs:
params['selector'] = kwargs['selector']
data = san.get(metric, **params)
if data is not None and not data.empty:
# Add metadata columns
data['metric'] = metric
data['slug'] = slug
if attempt > 0:
logger.info(f"[SUCCESS] {metric} for {slug} succeeded on attempt {attempt + 1}")
return data
except Exception as e:
error_msg = str(e)
keys_tried.add(self.current_key_index)
# Check if it's a rate limit error
if self._is_rate_limit_error(error_msg) and self.api_keys:
logger.warning(f"[RATE_LIMIT] API key #{self.current_key_index + 1} hit rate limit for {metric}: {error_msg}")
# Check if we've tried all keys
if len(keys_tried) >= len(self.api_keys):
logger.error(f"All {len(self.api_keys)} API keys exhausted for {metric}. Skipping.")
break # Exit retry loop since all keys are exhausted
# Try to switch to next API key
if self._switch_api_key():
continue # Retry with new API key
else:
logger.error("No more API keys available for switching")
# Handle rate limit with san library specific check
if hasattr(san, 'is_rate_limit_exception') and san.is_rate_limit_exception(e):
if hasattr(san, 'rate_limit_time_left'):
rate_limit_seconds = san.rate_limit_time_left(e)
logger.warning(f"Santiment rate limit hit. Sleeping for {rate_limit_seconds} seconds")
time.sleep(rate_limit_seconds)
else:
# Try switching API key if available
if self.api_keys and self._switch_api_key():
continue
else:
time.sleep(60) # Default wait
else:
# Check for specific error types that mean we should skip this metric entirely
if any(skip_phrase in error_msg.lower() for skip_phrase in [
'not supported for',
'is not an existing slug',
'not implemented for',
'missing_contract',
'outside the allowed interval',
'upgrade to a higher tier'
]):
logger.warning(f"[SKIP] {metric} for {slug} - {error_msg}")
return None # Skip this metric/slug combination entirely
logger.error(f"Failed to fetch {metric} for {slug}: {error_msg}")
error_info = {
'metric': metric,
'slug': slug,
'error': error_msg,
'timestamp': datetime.now().isoformat(),
'api_key_index': self.current_key_index
}
self.failed_queries.append(error_info)
return None
def fetch_multi_asset_metric(self, metric: str, slugs: List[str], **kwargs) -> Optional[pd.DataFrame]:
"""
Fetch a single metric for multiple assets using get_many
Args:
metric: The metric name
slugs: List of asset slugs
**kwargs: Additional parameters for the API call
Returns:
DataFrame with the metric data or None if failed
"""
max_retries = len(self.api_keys) if self.api_keys else 1
keys_tried = set()
for attempt in range(max_retries):
try:
# If we've tried all keys, reset and wait
if len(keys_tried) >= len(self.api_keys) and self.api_keys:
logger.warning(f"All {len(self.api_keys)} API keys exhausted for {metric}, waiting 30 seconds...")
time.sleep(30)
keys_tried.clear()
self.current_key_index = 0
self._set_current_api_key()
params = {
'slugs': slugs,
'from_date': kwargs.get('from_date', self.config.from_date),
'to_date': kwargs.get('to_date', self.config.to_date),
'interval': kwargs.get('interval', self.config.interval),
'include_incomplete_data': kwargs.get('include_incomplete_data', self.config.include_incomplete_data)
}
data = san.get_many(metric, **params)
if data is not None and not data.empty:
# Reshape data for consistent format
data_melted = data.reset_index().melt(
id_vars=['datetime'],
var_name='slug',
value_name='value'
)
data_melted['metric'] = metric
data_melted.set_index('datetime', inplace=True)
if attempt > 0:
logger.info(f"[SUCCESS] {metric} for multiple assets succeeded on attempt {attempt + 1}")
return data_melted
except Exception as e:
error_msg = str(e)
keys_tried.add(self.current_key_index)
# Check if it's a rate limit error
if self._is_rate_limit_error(error_msg) and self.api_keys:
logger.warning(f"[RATE_LIMIT] API key #{self.current_key_index + 1} hit rate limit for {metric}: {error_msg}")
# Check if we've tried all keys
if len(keys_tried) >= len(self.api_keys):
logger.error(f"All {len(self.api_keys)} API keys exhausted for {metric}. Skipping.")
break # Exit retry loop since all keys are exhausted
# Try to switch to next API key
if self._switch_api_key():
continue # Retry with new API key
else:
logger.error("No more API keys available for switching")
# Handle rate limit with san library specific check
if hasattr(san, 'is_rate_limit_exception') and san.is_rate_limit_exception(e):
if hasattr(san, 'rate_limit_time_left'):
rate_limit_seconds = san.rate_limit_time_left(e)
logger.warning(f"Santiment rate limit hit. Sleeping for {rate_limit_seconds} seconds")
time.sleep(rate_limit_seconds)
else:
# Try switching API key if available
if self.api_keys and self._switch_api_key():
continue
else:
time.sleep(60) # Default wait
else:
logger.error(f"Failed to fetch {metric} for multiple assets: {error_msg}")
error_info = {
'metric': metric,
'slugs': slugs,
'error': error_msg,
'timestamp': datetime.now().isoformat(),
'api_key_index': self.current_key_index
}
self.failed_queries.append(error_info)
return None
def fetch_category_batch(self, category: str, slugs: List[str], use_async_batch: bool = True) -> Dict[str, pd.DataFrame]:
"""
Fetch all metrics in a category using batch operations with symbol alternatives fallback
Args:
category: The metric category name
slugs: List of asset slugs to fetch for
use_async_batch: Whether to use AsyncBatch (recommended) or Batch
Returns:
Dictionary mapping metric names to DataFrames
"""
if category not in self.metric_categories:
logger.error(f"Unknown category: {category}")
return {}
metrics = self.metric_categories[category]
category_data = {}
# Filter metrics that are actually available
available_metrics_in_category = [m for m in metrics if m in self.available_metrics]
if not available_metrics_in_category:
logger.warning(f"No available metrics found for category: {category}")
return {}
logger.info(f"Fetching {len(available_metrics_in_category)} metrics for category: {category}")
# First try batch operation with normalized slugs
normalized_slugs = self.normalize_slug_list(slugs)
batch_success = self._try_batch_fetch(category, available_metrics_in_category, normalized_slugs, use_async_batch)
category_data.update(batch_success)
# For failed metrics, try individual fetches with alternatives
failed_metrics = [m for m in available_metrics_in_category if m not in batch_success]
if failed_metrics:
logger.info(f"Retrying {len(failed_metrics)} failed metrics with alternatives")
individual_results = self._fetch_failed_metrics_with_alternatives(failed_metrics, slugs)
category_data.update(individual_results)
return category_data
def _try_batch_fetch(self, category: str, metrics: List[str], slugs: List[str], use_async_batch: bool) -> Dict[str, pd.DataFrame]:
"""Try batch fetch operation"""
category_data = {}
try:
if use_async_batch:
batch = san.AsyncBatch()
else:
batch = san.Batch()
# Add queries to batch
for metric in metrics:
try:
if len(slugs) == 1:
batch.get(
metric,
slug=slugs[0],
from_date=self.config.from_date,
to_date=self.config.to_date,
interval=self.config.interval,
include_incomplete_data=self.config.include_incomplete_data
)
else:
batch.get_many(
metric,
slugs=slugs,
from_date=self.config.from_date,
to_date=self.config.to_date,
interval=self.config.interval,
include_incomplete_data=self.config.include_incomplete_data
)
except Exception as e:
logger.warning(f"Failed to add {metric} to batch: {e}")
# Execute batch
if use_async_batch:
results = batch.execute(max_workers=self.config.max_workers)
else:
results = batch.execute()
# Process results
for i, (metric, result) in enumerate(zip(metrics, results)):
if result is not None and not result.empty:
if len(slugs) > 1:
# Reshape multi-asset data
result_melted = result.reset_index().melt(
id_vars=['datetime'],
var_name='slug',
value_name='value'
)
result_melted['metric'] = metric
result_melted.set_index('datetime', inplace=True)
category_data[metric] = result_melted
else:
result['metric'] = metric
result['slug'] = slugs[0]
category_data[metric] = result
else:
logger.debug(f"No data received for metric: {metric} in batch")
except Exception as e:
logger.error(f"Batch execution failed for category {category}: {e}")
return category_data
def _fetch_failed_metrics_with_alternatives(self, metrics: List[str], original_slugs: List[str]) -> Dict[str, pd.DataFrame]:
"""Fetch failed metrics individually using symbol alternatives"""
individual_data = {}
for metric in metrics:
logger.info(f"Retrying {metric} with symbol alternatives...")
if len(original_slugs) == 1:
# Single asset - use alternatives
result = self.fetch_single_metric_with_alternatives(metric, original_slugs[0])
if result is not None:
individual_data[metric] = result
else:
# Multiple assets - try each with alternatives and combine
all_results = []
for slug in original_slugs:
result = self.fetch_single_metric_with_alternatives(metric, slug)
if result is not None:
all_results.append(result)
if all_results:
# Concatenate results - they already have datetime as index
combined_result = pd.concat(all_results, ignore_index=False, sort=False)
# Ensure datetime index is properly set
if not isinstance(combined_result.index, pd.DatetimeIndex):
if 'datetime' in combined_result.columns:
combined_result.set_index('datetime', inplace=True)
individual_data[metric] = combined_result
return individual_data
def fetch_special_metrics(self, slugs: List[str]) -> Dict[str, pd.DataFrame]:
"""
Fetch special metrics that have different API signatures
Args:
slugs: List of asset slugs
Returns:
Dictionary mapping metric names to DataFrames
"""
special_data = {}
for slug in slugs:
max_retries = len(self.api_keys) if self.api_keys else 1
keys_tried = set()
for attempt in range(max_retries):
try:
# If we've tried all keys, reset and wait
if len(keys_tried) >= len(self.api_keys) and self.api_keys:
logger.warning(f"All {len(self.api_keys)} API keys exhausted for special metrics on {slug}, waiting 30 seconds...")
time.sleep(30)
keys_tried.clear()
self.current_key_index = 0
self._set_current_api_key()
# OHLCV data
logger.info(f"Fetching OHLCV data for {slug}")
ohlcv = san.get(
f"ohlcv/{slug}",
from_date=self.config.from_date,
to_date=self.config.to_date,
interval=self.config.interval
)
if ohlcv is not None and not ohlcv.empty:
ohlcv['metric'] = 'ohlcv'
ohlcv['slug'] = slug
special_data[f'ohlcv_{slug}'] = ohlcv
# Prices with OHLC format
logger.info(f"Fetching detailed prices for {slug}")
prices = san.get(
"prices",
slug=slug,
from_date=self.config.from_date,
to_date=self.config.to_date,
interval=self.config.interval
)
if prices is not None and not prices.empty:
prices['metric'] = 'prices_detailed'
prices['slug'] = slug
special_data[f'prices_{slug}'] = prices
# If we get here, the attempt was successful
break
except Exception as e:
error_msg = str(e)
keys_tried.add(self.current_key_index)
# Check if it's a rate limit error
if self._is_rate_limit_error(error_msg) and self.api_keys:
logger.warning(f"[RATE_LIMIT] API key #{self.current_key_index + 1} hit rate limit for special metrics on {slug}: {error_msg}")
# Check if we've tried all keys
if len(keys_tried) >= len(self.api_keys):
logger.error(f"All {len(self.api_keys)} API keys exhausted for special metrics on {slug}. Skipping.")
break # Exit retry loop since all keys are exhausted
# Try to switch to next API key
if self._switch_api_key():
continue # Retry with new API key
else:
logger.error("No more API keys available for switching")
logger.error(f"Failed to fetch special metrics for {slug}: {e}")
break # Exit retry loop for this slug
return special_data
def fetch_blockchain_address_data(self, addresses: List[str], slugs: List[str]) -> Dict[str, pd.DataFrame]:
"""
Fetch blockchain address-related data
Args:
addresses: List of blockchain addresses
slugs: List of asset slugs for context
Returns:
Dictionary mapping data types to DataFrames
"""
address_data = {}
for slug in slugs:
for address in addresses:
try:
# Historical balance
balance = san.get(
"historical_balance",
slug=slug,
address=address,
from_date=self.config.from_date,
to_date=self.config.to_date,
interval=self.config.interval
)
if balance is not None and not balance.empty:
balance['address'] = address
balance['slug'] = slug
address_data[f'historical_balance_{slug}_{address[:8]}'] = balance
# Top transactions
top_txs = san.get(
"eth_top_transactions",
slug=slug,
from_date=self.config.from_date,
to_date=self.config.to_date,
limit=100,
transaction_type="ALL"
)
if top_txs is not None and not top_txs.empty:
top_txs['slug'] = slug
address_data[f'eth_top_transactions_{slug}'] = top_txs
except Exception as e:
logger.error(f"Failed to fetch address data for {address} on {slug}: {e}")
return address_data
def execute_custom_sql_queries(self) -> Dict[str, pd.DataFrame]:
"""
Execute custom SQL queries for additional data insights, using dictGetString for asset metadata.
Returns:
Dictionary mapping query names to DataFrames
"""
sql_data = {}
custom_queries = {
'top_assets_by_volume': """
SELECT
dictGetString('default.asset_metadata_dict', 'name', asset_id) as asset_name,
dictGetString('default.asset_metadata_dict', 'slug', asset_id) as slug,
SUM(value) as total_volume
FROM daily_metrics_v2
WHERE metric_id = get_metric_id('volume_usd')
AND dt >= now() - INTERVAL 30 DAY
GROUP BY asset_id
ORDER BY total_volume DESC
LIMIT 50
""",
'recent_high_activity_addresses': """
SELECT
dictGetString('default.asset_metadata_dict', 'name', asset_id) as asset_name,
get_metric_name(metric_id) as metric_name,
dt,
value
FROM daily_metrics_v2
WHERE metric_id = get_metric_id('daily_active_addresses')
AND dt >= now() - INTERVAL 7 DAY
AND value > 1000
ORDER BY dt DESC, value DESC
LIMIT 100
""",
'exchange_flow_summary': """
SELECT
dictGetString('default.asset_metadata_dict', 'name', asset_id) as asset_name,
dt,
SUM(CASE WHEN metric_id = get_metric_id('exchange_inflow') THEN value ELSE 0 END) as inflow,
SUM(CASE WHEN metric_id = get_metric_id('exchange_outflow') THEN value ELSE 0 END) as outflow
FROM daily_metrics_v2
WHERE metric_id IN (get_metric_id('exchange_inflow'), get_metric_id('exchange_outflow'))
AND dt >= now() - INTERVAL 30 DAY
GROUP BY asset_id, dt
ORDER BY dt DESC
LIMIT 1000
"""
}
for query_name, query in custom_queries.items():
try:
logger.info(f"Executing SQL query: {query_name}")
result = san.execute_sql(query=query, set_index="dt" if "dt" in query else None)
if result is not None and not result.empty:
sql_data[query_name] = result
logger.info(f"SQL query {query_name} returned {len(result)} rows")
except Exception as e:
logger.error(f"Failed to execute SQL query {query_name}: {e}")
return sql_data
def fetch_comprehensive_data(self,
slugs: List[str] = None,
categories: List[str] = None,
include_special_metrics: bool = True,
include_sql_queries: bool = True,
addresses: List[str] = None) -> Dict[str, Any]:
"""
Fetch comprehensive data across all categories and metrics
Args:
slugs: List of asset slugs (if None, uses top assets)
categories: List of categories to fetch (if None, fetches all)
include_special_metrics: Whether to include special format metrics
include_sql_queries: Whether to execute custom SQL queries
addresses: List of blockchain addresses for address-specific data
Returns:
Dictionary containing all fetched data organized by category
"""
# Set defaults
if slugs is None:
slugs = ['bitcoin', 'ethereum', 'cardano', 'polkadot', 'chainlink',
'litecoin', 'bitcoin-cash', 'stellar', 'ethereum-classic', 'eos']
# Normalize and deduplicate slugs
slugs = self.normalize_slug_list(slugs)
if categories is None:
categories = list(self.metric_categories.keys())
# Limit slugs for free tier
if not san.ApiConfig.api_key:
slugs = slugs[:3] # Limit to 3 assets for free tier
logger.warning("No API key detected. Limiting to 3 assets to avoid rate limits.")
all_data = {}
start_time = datetime.now()
logger.info(f"Starting comprehensive data fetch for {len(slugs)} assets across {len(categories)} categories")
# Check if all API keys are exhausted early
all_keys_exhausted = False
if self.api_keys and self.rate_limit_switches > len(self.api_keys) * 3:
logger.warning("⚠️ All API keys appear to be rate-limited. Attempting reduced fetch...")
all_keys_exhausted = True
# Fetch data by category
for category in categories:
if all_keys_exhausted:
logger.info(f"Skipping category {category} due to API exhaustion")
continue
logger.info(f"Fetching category: {category}")
category_data = self.fetch_category_batch(category, slugs, use_async_batch=True)
if category_data:
all_data[category] = category_data
# Store individual DataFrames for later use
for metric_name, df in category_data.items():
self.fetched_data[f"{category}_{metric_name}"] = df
# Check if we should stop due to rate limits
if self.rate_limit_switches > len(self.api_keys) * 5:
logger.warning("⚠️ Excessive rate limit switches detected. Stopping data fetch to avoid further exhaustion.")
all_keys_exhausted = True
break
# Fetch special metrics (only if not exhausted)
if include_special_metrics and not all_keys_exhausted:
logger.info("Fetching special metrics...")
special_data = self.fetch_special_metrics(slugs)
if special_data:
all_data['special_metrics'] = special_data
self.fetched_data.update(special_data)
elif all_keys_exhausted:
logger.info("Skipping special metrics due to API exhaustion")
# Fetch blockchain address data
if addresses and not all_keys_exhausted:
logger.info("Fetching blockchain address data...")
address_data = self.fetch_blockchain_address_data(addresses, slugs)
if address_data:
all_data['address_data'] = address_data
self.fetched_data.update(address_data)
elif addresses and all_keys_exhausted:
logger.info("Skipping blockchain address data due to API exhaustion")
# Execute SQL queries (only if not exhausted)
if include_sql_queries and san.ApiConfig.api_key and not all_keys_exhausted:
logger.info("Executing custom SQL queries...")
sql_data = self.execute_custom_sql_queries()
if sql_data:
all_data['sql_queries'] = sql_data
self.fetched_data.update(sql_data)
elif all_keys_exhausted:
logger.info("Skipping SQL queries due to API exhaustion")
end_time = datetime.now()
duration = end_time - start_time
logger.info(f"Comprehensive data fetch completed in {duration}")
logger.info(f"Successfully fetched {len(self.fetched_data)} datasets")
logger.info(f"Failed queries: {len(self.failed_queries)}")
# Add exhaustion notice to summary
if all_keys_exhausted:
logger.warning("⚠️ Data fetch completed with API rate limit exhaustion - some data may be missing")
# Generate summary
summary = self._generate_fetch_summary(all_data, duration)
summary['all_keys_exhausted'] = all_keys_exhausted
summary['rate_limit_switches'] = self.rate_limit_switches
all_data['fetch_summary'] = summary
return all_data
def _generate_fetch_summary(self, data: Dict[str, Any], duration: timedelta) -> Dict[str, Any]:
"""Generate a summary of the data fetching operation"""
summary = {
'fetch_duration': str(duration),
'total_datasets': len(self.fetched_data),
'failed_queries': len(self.failed_queries),
'categories_fetched': list(data.keys()),
'data_points_by_category': {},
'date_range': f"{self.config.from_date} to {self.config.to_date}",
'interval': self.config.interval,
'timestamp': datetime.now().isoformat()
}
# Count data points by category
for category, category_data in data.items():
if isinstance(category_data, dict):
total_points = sum(len(df) for df in category_data.values() if isinstance(df, pd.DataFrame))
summary['data_points_by_category'][category] = total_points
return summary
def export_data(self,
export_format: str = None,
combine_categories: bool = False,
include_metadata: bool = True) -> Dict[str, str]:
"""
Export fetched data to files
Args:
export_format: Export format ('csv', 'json', 'parquet')
combine_categories: Whether to combine all data into single files
include_metadata: Whether to include metadata files
Returns:
Dictionary mapping data names to file paths
"""
export_format = export_format or self.config.export_format
exported_files = {}
if not self.fetched_data:
logger.warning("No data to export")
return exported_files
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if combine_categories:
# Combine all DataFrames
all_dfs = []
for name, df in self.fetched_data.items():
if isinstance(df, pd.DataFrame) and not df.empty:
df_copy = df.copy()
df_copy['dataset_name'] = name
all_dfs.append(df_copy)
if all_dfs:
combined_df = pd.concat(all_dfs, ignore_index=True, sort=False)
filename = f"santiment_comprehensive_data_{timestamp}.{export_format}"
filepath = os.path.join(self.config.export_directory, filename)
self._export_dataframe(combined_df, filepath, export_format)
exported_files['combined_data'] = filepath
else:
# Export individual datasets
for name, df in self.fetched_data.items():
if isinstance(df, pd.DataFrame) and not df.empty:
filename = f"santiment_{name}_{timestamp}.{export_format}"
filepath = os.path.join(self.config.export_directory, filename)
self._export_dataframe(df, filepath, export_format)
exported_files[name] = filepath
# Export metadata and summary
if include_metadata:
metadata = {
'failed_queries': self.failed_queries,
'available_metrics': self.available_metrics,
'config': {
'from_date': self.config.from_date,
'to_date': self.config.to_date,
'interval': self.config.interval,
'batch_size': self.config.batch_size
},
'export_timestamp': datetime.now().isoformat()
}
metadata_file = os.path.join(self.config.export_directory, f"santiment_metadata_{timestamp}.json")
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2)
exported_files['metadata'] = metadata_file
logger.info(f"Exported {len(exported_files)} files to {self.config.export_directory}")
return exported_files
def _export_dataframe(self, df: pd.DataFrame, filepath: str, format_type: str):
"""Export a DataFrame to the specified format"""
try:
if format_type == 'csv':
df.to_csv(filepath)
elif format_type == 'json':
df.to_json(filepath, date_format='iso', orient='records')
elif format_type == 'parquet':
df.to_parquet(filepath)
else:
logger.error(f"Unsupported export format: {format_type}")
return
logger.info(f"Exported DataFrame to {filepath}")
except Exception as e:
logger.error(f"Failed to export DataFrame to {filepath}: {e}")
def get_api_usage_stats(self) -> Dict[str, Any]:
"""Get API usage statistics"""
try:
stats = {
'calls_made': san.api_calls_made(),
'calls_remaining': san.api_calls_remaining(),
'failed_queries': len(self.failed_queries),
'successful_datasets': len(self.fetched_data)
}
return stats
except Exception as e:
logger.error(f"Failed to get API usage stats: {e}")
return {}
def print_summary(self):
"""Print a comprehensive summary of the fetching operation"""
print("\n" + "="*60)
print("SANTIMENT DATA FETCHER SUMMARY")
print("="*60)
# Basic stats
print(f"Total datasets fetched: {len(self.fetched_data)}")
print(f"Failed queries: {len(self.failed_queries)}")
# Configuration info
print(f"\nConfiguration:")
print(f" Date range: {self.config.from_date} to {self.config.to_date}")
print(f" Interval: {self.config.interval}")
print(f" Export directory: {self.config.export_directory}")
# Categories summary
if self.fetched_data:
print(f"\nData by category:")
category_counts = {}
for key in self.fetched_data.keys():
if '_' in key:
category = key.split('_')[0]
category_counts[category] = category_counts.get(category, 0) + 1
for category, count in sorted(category_counts.items()):
print(f" {category}: {count} datasets")
# Sample data info
if self.fetched_data:
print(f"\nSample datasets:")
for i, (name, df) in enumerate(list(self.fetched_data.items())[:5]):
if isinstance(df, pd.DataFrame):
print(f" {name}: {len(df)} rows, {len(df.columns)} columns")
if not df.empty:
date_range = f"{df.index.min()} to {df.index.max()}" if hasattr(df.index, 'min') else "N/A"
print(f" Date range: {date_range}")
# Failed queries summary
if self.failed_queries:
print(f"\nFailed queries summary:")
error_types = {}
for failed in self.failed_queries:
error_msg = str(failed.get('error', 'Unknown error'))
error_type = error_msg.split(':')[0] if ':' in error_msg else error_msg
error_types[error_type] = error_types.get(error_type, 0) + 1
for error_type, count in sorted(error_types.items()):
print(f" {error_type}: {count} occurrences")
# API usage stats
try:
api_stats = self.get_api_usage_stats()
if api_stats:
print(f"\nAPI Usage:")
print(f" Calls made: {api_stats.get('calls_made', 'N/A')}")
print(f" Calls remaining: {api_stats.get('calls_remaining', 'N/A')}")
except:
pass
print("="*60)
def analyze_data_quality(self) -> Dict[str, Any]:
"""Analyze the quality of fetched data"""
quality_report = {
'total_datasets': len(self.fetched_data),
'empty_datasets': 0,
'datasets_with_nulls': 0,
'date_coverage': {},
'data_completeness': {},
'outliers_detected': {}
}
for name, df in self.fetched_data.items():
if isinstance(df, pd.DataFrame):
# Check if dataset is empty
if df.empty:
quality_report['empty_datasets'] += 1
continue
# Check for null values
if df.isnull().any().any():
quality_report['datasets_with_nulls'] += 1
null_percentage = (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100
quality_report['data_completeness'][name] = f"{100 - null_percentage:.2f}%"
# Analyze date coverage
if hasattr(df.index, 'min') and hasattr(df.index, 'max'):
try:
date_range = {
'start': str(df.index.min()),
'end': str(df.index.max()),
'days': (df.index.max() - df.index.min()).days if hasattr(df.index.max() - df.index.min(), 'days') else 'N/A'
}
quality_report['date_coverage'][name] = date_range
except:
quality_report['date_coverage'][name] = 'Unable to determine'
# Simple outlier detection for numeric columns
numeric_cols = df.select_dtypes(include=[np.number]).columns
outlier_info = {}
for col in numeric_cols:
if col not in ['metric', 'slug']: # Skip metadata columns
try:
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)]
if len(outliers) > 0:
outlier_info[col] = len(outliers)
except:
continue
if outlier_info:
quality_report['outliers_detected'][name] = outlier_info
return quality_report
def create_data_dashboard(self) -> str:
"""Create a simple HTML dashboard summarizing the fetched data"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
total_datasets = len(self.fetched_data)
date_range = f"{self.config.from_date} to {self.config.to_date}"
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>Santiment Data Dashboard</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.header {{ background-color: #f0f0f0; padding: 20px; border-radius: 5px; }}
.section {{ margin: 20px 0; padding: 15px; border: 1px solid #ddd; border-radius: 5px; }}
.metric-card {{ display: inline-block; margin: 10px; padding: 15px; background-color: #f9f9f9; border-radius: 5px; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
</style>
</head>
<body>
<div class="header">
<h1>Santiment Data Dashboard</h1>
<p>Generated on: {timestamp}</p>
<p>Total Datasets: {total_datasets}</p>
<p>Date Range: {date_range}</p>
</div>
"""
# Add category summary
if self.fetched_data:
category_counts = {}
for key in self.fetched_data.keys():
if '_' in key:
category = key.split('_')[0]
category_counts[category] = category_counts.get(category, 0) + 1
html_content += """
<div class="section">
<h2>Categories Overview</h2>
"""
for category, count in sorted(category_counts.items()):
html_content += f'<div class="metric-card"><strong>{category}</strong><br>{count} datasets</div>'
html_content += "</div>"
# Add failed queries section
if self.failed_queries:
html_content += """
<div class="section">
<h2>Failed Queries</h2>
<table>
<tr><th>Metric</th><th>Slug</th><th>Error</th></tr>
"""
for failed in self.failed_queries[:10]: # Show first 10
metric = failed.get('metric', 'N/A')
slug = failed.get('slug', failed.get('slugs', 'N/A'))
error = str(failed.get('error', 'Unknown'))[:100] + '...' if len(str(failed.get('error', ''))) > 100 else failed.get('error', 'Unknown')
html_content += f"<tr><td>{metric}</td><td>{slug}</td><td>{error}</td></tr>"
html_content += "</table></div>"
html_content += "</body></html>"
# Save dashboard
dashboard_path = os.path.join(
self.config.export_directory,
f"santiment_dashboard_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
)
with open(dashboard_path, 'w') as f:
f.write(html_content)
logger.info(f"Dashboard created at {dashboard_path}")
return dashboard_path
def get_top_performing_assets(self, metric: str = 'price_usd', days: int = 30) -> pd.DataFrame:
"""
Analyze top performing assets based on a specific metric
Args:
metric: The metric to analyze performance on
days: Number of days to look back for performance calculation
Returns:
DataFrame with performance analysis
"""
performance_data = []
for name, df in self.fetched_data.items():
if isinstance(df, pd.DataFrame) and metric in str(name) and not df.empty:
try:
if 'slug' in df.columns:
# Group by slug and calculate performance
for slug in df['slug'].unique():
slug_data = df[df['slug'] == slug].copy()
if len(slug_data) >= 2:
slug_data = slug_data.sort_index()
# Calculate performance over the specified period
if len(slug_data) > days:
recent_data = slug_data.tail(days)
else:
recent_data = slug_data
if 'value' in recent_data.columns and not recent_data['value'].empty:
start_value = recent_data['value'].iloc[0]
end_value = recent_data['value'].iloc[-1]
if start_value and start_value != 0:
performance = ((end_value - start_value) / start_value) * 100
performance_data.append({
'slug': slug,
'metric': metric,
'start_value': start_value,
'end_value': end_value,
'performance_pct': performance,
'data_points': len(recent_data),
'period_days': days
})
except Exception as e:
logger.warning(f"Failed to analyze performance for {name}: {e}")
if performance_data:
performance_df = pd.DataFrame(performance_data)
return performance_df.sort_values('performance_pct', ascending=False)
else:
return pd.DataFrame()
def cleanup_export_directory(self) -> bool:
"""
Manually clean up the export directory.
Returns:
bool: True if cleanup was successful, False otherwise
"""
try:
self._cleanup_existing_files()
return True
except Exception as e:
logger.error(f"Manual cleanup failed: {e}")
return False
def get_api_key_status(self):
"""Get status information about API key usage"""
if not self.api_keys:
return {
"total_keys": 0,
"current_key": "None",
"rate_limit_switches": self.rate_limit_switches,
"current_key_preview": "No API key"
}
return {
"total_keys": len(self.api_keys),
"current_key": self.current_key_index + 1,
"rate_limit_switches": self.rate_limit_switches,
"current_key_preview": self.api_keys[self.current_key_index][:8] + "..."
}
def print_api_key_status(self):
"""Print API key usage status"""
status = self.get_api_key_status()
print(f"\n[API_STATUS] Using {status['total_keys']} API key(s)")
if status['total_keys'] > 0:
print(f"[API_STATUS] Current: Key #{status['current_key']} ({status['current_key_preview']})")
print(f"[API_STATUS] Rate limit switches: {status['rate_limit_switches']}")
if status['rate_limit_switches'] > 0:
print(f"[API_STATUS] Effective rate limit handling active")
else:
print(f"[API_STATUS] No API keys configured - using free tier")
print()
def save_configuration(self, config_path: str = None) -> str:
"""Save current configuration to a JSON file"""
if config_path is None:
config_path = os.path.join(self.config.export_directory, "santiment_config.json")
config_dict = {
'from_date': self.config.from_date,
'to_date': self.config.to_date,
'interval': self.config.interval,
'include_incomplete_data': self.config.include_incomplete_data,
'batch_size': self.config.batch_size,
'max_workers': self.config.max_workers,
'rate_limit_delay': self.config.rate_limit_delay,
'export_format': self.config.export_format,
'export_directory': self.config.export_directory,
'saved_at': datetime.now().isoformat()
}
with open(config_path, 'w') as f:
json.dump(config_dict, f, indent=2)
logger.info(f"Configuration saved to {config_path}")
return config_path
@classmethod
def load_configuration(cls, config_path: str) -> 'SantimentDataFetcher':
"""Load configuration from a JSON file and create a fetcher instance"""
with open(config_path, 'r') as f:
config_dict = json.load(f)
# Remove metadata fields
config_dict.pop('saved_at', None)
config = FetchConfig(**config_dict)
return cls(config=config)
# Utility functions for easy usage
def cleanup_santiment_directory(directory_path: str = "data/santiment") -> bool:
"""
Utility function to clean up a Santiment data directory without creating a fetcher instance.
Args:
directory_path: Path to the directory to clean up
Returns:
bool: True if cleanup was successful, False otherwise
"""
import glob
import shutil
try:
if not os.path.exists(directory_path):
logger.info(f"Directory does not exist: {directory_path}")
return True
# Get all files in the directory
all_files = glob.glob(os.path.join(directory_path, "*"))
if all_files:
logger.info(f"Cleaning up {len(all_files)} existing files in {directory_path}")
for file_path in all_files:
try:
if os.path.isfile(file_path):
os.remove(file_path)
logger.debug(f"Removed file: {os.path.basename(file_path)}")
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
logger.debug(f"Removed directory: {os.path.basename(file_path)}")
except Exception as e:
logger.warning(f"Failed to remove {file_path}: {e}")
logger.info(f"Successfully cleaned up directory: {directory_path}")
else:
logger.info(f"Directory is already clean: {directory_path}")
return True
except Exception as e:
logger.error(f"Failed to cleanup directory {directory_path}: {e}")
return False
def fetch_quick_crypto_overview(assets: List[str] = None, api_key: str = None) -> Dict[str, pd.DataFrame]:
"""
Quick function to fetch essential crypto data for analysis
Args:
assets: List of asset slugs (defaults to top 10 cryptos)
api_key: Santiment API key
Returns:
Dictionary with essential data
"""
if assets is None:
assets = ['bitcoin', 'ethereum', 'solana', 'ripple', 'cardano']
config = FetchConfig(
from_date="2025-07-01", # Changed to be within free tier allowed range
to_date="2025-07-06", # Use last valid date for free tier
interval="30m",
export_format="parquet"
)
fetcher = SantimentDataFetcher(api_key=api_key, config=config)
# Fetch essential categories
essential_categories = ['financial', 'network_activity', 'exchange']
data = fetcher.fetch_comprehensive_data(
slugs=assets,
categories=essential_categories,
include_special_metrics=True,
include_sql_queries=False
)
return data
def create_crypto_report(assets: List[str], output_dir: str = "./crypto_report", api_key: str = None):
"""
Create a comprehensive crypto analysis report
Args:
assets: List of asset slugs to analyze
output_dir: Directory to save the report
api_key: Santiment API key(s) - can be comma-separated for multiple keys
"""
config = FetchConfig(
from_date="2025-07-01", # Changed to be within free tier allowed range
to_date="2025-07-06", # Use last valid date for free tier
interval="30m",
export_directory=output_dir,
export_format="parquet" # Use Parquet for output
)
fetcher = SantimentDataFetcher(api_key=api_key, config=config)
# Print API key status
fetcher.print_api_key_status()
# Fetch comprehensive data
logger.info("Fetching comprehensive cryptocurrency data...")
data = fetcher.fetch_comprehensive_data(
slugs=assets,
include_special_metrics=True,
include_sql_queries=True
)
# Export data
logger.info("Exporting data to files...")
exported_files = fetcher.export_data(combine_categories=False, include_metadata=True)
# Create dashboard
logger.info("Creating data dashboard...")
dashboard_path = fetcher.create_data_dashboard()
# Analyze data quality
logger.info("Analyzing data quality...")
quality_report = fetcher.analyze_data_quality()
# Save quality report
quality_path = os.path.join(output_dir, "data_quality_report.json")
with open(quality_path, 'w') as f:
json.dump(quality_report, f, indent=2, default=str)
# Print summary
fetcher.print_summary()
print(f"\nReport generated successfully!")
print(f"Dashboard: {dashboard_path}")
print(f"Data files: {len(exported_files)} files in {output_dir}")
print(f"Quality report: {quality_path}")
# Print final API key status
print("\n[FINAL_STATUS] Santiment API Key Usage Summary:")
fetcher.print_api_key_status()
# Example usage
def main():
# Get API key from environment (already loaded at module top)
santiment_api_key = os.getenv("SANTIMENT_API_KEY")
# Create fetcher instance
fetcher = SantimentDataFetcher(api_key=santiment_api_key)
# Print API key status
fetcher.print_api_key_status()
# DISABLED: Do not cleanup Santiment directory to preserve data
# cleanup_santiment_directory("./data/santiment")
print("[SANTIMENT] Data preservation mode - keeping existing data")
# Reduced scope for API conservation - only top 2 crypto assets
print("Fetching reduced crypto overview (API conservation mode)...")
# Note: Reduced from 5 to 2 assets to conserve API calls
overview_data = fetch_quick_crypto_overview(['bitcoin', 'ethereum'], api_key=santiment_api_key)
# Comprehensive analysis - reduced scope
print("\nCreating conservative crypto report...")
# Note: Reduced scope - only Bitcoin and Ethereum to preserve API limits
create_crypto_report(
assets=['bitcoin', 'ethereum'], # Reduced from 5 to 2 assets
output_dir="./data/santiment",
api_key=santiment_api_key
)
# Print final API key status
print("\n[FINAL_STATUS] Santiment API Key Usage Summary:")
fetcher.print_api_key_status()
if __name__ == "__main__":
main()