|
""" |
|
Santiment-Crypto Features Merger |
|
=============================== |
|
|
|
This script merges the Santiment merged features with the existing normalized crypto features. |
|
It reads santiment/merged_features.parquet and crypto_features_normalized.pkl, |
|
aligns them by symbol and datetime, and creates a unified feature set. |
|
|
|
Features: |
|
- Loads Santiment merged features (parquet) |
|
- Loads existing crypto features (pickle) |
|
- Symbol alignment and normalization |
|
- Time-based merging with tolerance |
|
- Feature name conflict resolution |
|
- Creates unified normalized feature set |
|
|
|
Author: AI Assistant |
|
Date: August 2025 |
|
""" |
|
|
|
import os |
|
import sys |
|
import pandas as pd |
|
import numpy as np |
|
import pickle |
|
from pathlib import Path |
|
from datetime import datetime, timedelta |
|
import logging |
|
from typing import List, Dict, Optional, Tuple, Union |
|
|
|
|
|
try: |
|
from src.config import DATA_DIR as CFG_DATA_DIR |
|
except Exception: |
|
try: |
|
from config import DATA_DIR as CFG_DATA_DIR |
|
except Exception: |
|
CFG_DATA_DIR = "/data" |
|
|
|
|
|
def _resolve_under_data(path_like: str | os.PathLike) -> Path: |
|
p = Path(path_like) |
|
if p.is_absolute(): |
|
return p |
|
parts = p.parts |
|
if parts and parts[0].lower() == "data": |
|
rel = Path(*parts[1:]) if len(parts) > 1 else Path() |
|
else: |
|
rel = p |
|
return Path(CFG_DATA_DIR) / rel |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
class SantimentCryptoMerger: |
|
""" |
|
Merger for combining Santiment features with existing crypto features |
|
""" |
|
|
|
def __init__(self, |
|
santiment_file: str = "data/santiment/merged_features.parquet", |
|
crypto_file: str = "data/merged/features/crypto_features.parquet", |
|
output_file: str = "data/merged/features/crypto_features.parquet", |
|
time_tolerance_hours: int = 1): |
|
""" |
|
Initialize the merger |
|
|
|
Args: |
|
santiment_file: Path to original Santiment merged features parquet file |
|
crypto_file: Path to original crypto features file (crypto_features.parquet) |
|
output_file: Path for the final merged output file (will replace crypto_features.parquet) |
|
time_tolerance_hours: Time tolerance for merging (hours) |
|
""" |
|
self.santiment_file = _resolve_under_data(santiment_file) |
|
self.crypto_file = _resolve_under_data(crypto_file) |
|
self.output_file = _resolve_under_data(output_file) |
|
self.time_tolerance = timedelta(hours=time_tolerance_hours) |
|
|
|
|
|
self.output_file.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
self.santiment_data: Optional[pd.DataFrame] = None |
|
self.crypto_data: Optional[pd.DataFrame] = None |
|
self.merged_data: Optional[pd.DataFrame] = None |
|
|
|
|
|
self.stats = { |
|
'santiment_records': 0, |
|
'crypto_records': 0, |
|
'common_symbols': 0, |
|
'merged_records': 0, |
|
'santiment_features': 0, |
|
'crypto_features': 0, |
|
'total_features': 0, |
|
'time_range': {} |
|
} |
|
|
|
|
|
self.symbol_normalizer = self._setup_symbol_normalizer() |
|
|
|
def _setup_symbol_normalizer(self): |
|
"""Setup symbol normalization mapping""" |
|
return { |
|
|
|
'bitcoin': 'BTC', 'btc': 'BTC', 'Bitcoin': 'BTC', 'BTC': 'BTC', |
|
'ethereum': 'ETH', 'eth': 'ETH', 'Ethereum': 'ETH', 'ETH': 'ETH', |
|
'ripple': 'XRP', 'xrp': 'XRP', 'Ripple': 'XRP', 'XRP': 'XRP', |
|
'solana': 'SOL', 'sol': 'SOL', 'Solana': 'SOL', 'SOL': 'SOL', |
|
'cardano': 'ADA', 'ada': 'ADA', 'Cardano': 'ADA', 'ADA': 'ADA', |
|
'polkadot': 'DOT', 'dot': 'DOT', 'Polkadot': 'DOT', 'DOT': 'DOT', |
|
'chainlink': 'LINK', 'link': 'LINK', 'Chainlink': 'LINK', 'LINK': 'LINK', |
|
'litecoin': 'LTC', 'ltc': 'LTC', 'Litecoin': 'LTC', 'LTC': 'LTC', |
|
'bitcoin-cash': 'BCH', 'bch': 'BCH', 'Bitcoin Cash': 'BCH', 'BCH': 'BCH', |
|
'stellar': 'XLM', 'xlm': 'XLM', 'Stellar': 'XLM', 'XLM': 'XLM', |
|
'ethereum-classic': 'ETC', 'etc': 'ETC', 'Ethereum Classic': 'ETC', 'ETC': 'ETC', |
|
'eos': 'EOS', 'EOS': 'EOS' |
|
} |
|
|
|
def normalize_symbol(self, symbol: str) -> str: |
|
"""Normalize a symbol to canonical format""" |
|
if symbol in self.symbol_normalizer: |
|
return self.symbol_normalizer[symbol] |
|
return symbol.upper() |
|
|
|
def load_santiment_data(self) -> bool: |
|
""" |
|
Load original Santiment merged features and apply time-shift logic |
|
|
|
Returns: |
|
True if successful, False otherwise |
|
""" |
|
try: |
|
if not self.santiment_file.exists(): |
|
logger.error(f"Santiment file not found: {self.santiment_file}") |
|
return False |
|
|
|
logger.info(f"Loading Santiment data from {self.santiment_file}") |
|
self.santiment_data = pd.read_parquet(self.santiment_file) |
|
|
|
|
|
if not isinstance(self.santiment_data.index, pd.DatetimeIndex): |
|
if 'datetime' in self.santiment_data.columns: |
|
self.santiment_data.set_index('datetime', inplace=True) |
|
else: |
|
logger.error("No datetime index found in Santiment data") |
|
return False |
|
|
|
|
|
if self.santiment_data.index.tz is None: |
|
self.santiment_data.index = self.santiment_data.index.tz_localize('UTC') |
|
else: |
|
self.santiment_data.index = self.santiment_data.index.tz_convert('UTC') |
|
|
|
|
|
if 'slug' in self.santiment_data.columns: |
|
self.santiment_data['symbol'] = self.santiment_data['slug'].apply(self.normalize_symbol) |
|
self.santiment_data.drop(columns=['slug'], inplace=True) |
|
elif 'symbol' in self.santiment_data.columns: |
|
self.santiment_data['symbol'] = self.santiment_data['symbol'].apply(self.normalize_symbol) |
|
else: |
|
logger.error("No symbol/slug column found in Santiment data") |
|
return False |
|
|
|
|
|
feature_cols = [col for col in self.santiment_data.columns if col != 'symbol'] |
|
rename_dict = {col: f"santiment_{col}" for col in feature_cols} |
|
self.santiment_data.rename(columns=rename_dict, inplace=True) |
|
|
|
self.stats['santiment_records'] = len(self.santiment_data) |
|
self.stats['santiment_features'] = len([col for col in self.santiment_data.columns if col != 'symbol']) |
|
|
|
logger.info(f"Loaded Santiment data: {len(self.santiment_data)} records, {len(self.santiment_data.columns)} columns") |
|
logger.info(f"Santiment symbols: {sorted(self.santiment_data['symbol'].unique())}") |
|
logger.info(f"Santiment date range: {self.santiment_data.index.min()} to {self.santiment_data.index.max()}") |
|
|
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to load Santiment data: {e}") |
|
return False |
|
|
|
def load_crypto_data(self) -> bool: |
|
""" |
|
Load existing crypto features |
|
|
|
Returns: |
|
True if successful, False otherwise |
|
""" |
|
try: |
|
if not self.crypto_file.exists(): |
|
logger.error(f"Crypto file not found: {self.crypto_file}") |
|
return False |
|
|
|
logger.info(f"Loading crypto data from {self.crypto_file}") |
|
|
|
|
|
self.crypto_data = pd.read_parquet(self.crypto_file) |
|
|
|
|
|
|
|
if 'interval_timestamp' not in self.crypto_data.columns: |
|
logger.error("No interval_timestamp column found in crypto data") |
|
return False |
|
|
|
|
|
symbol_col = None |
|
for col in ['symbol', 'Symbol', 'ticker', 'asset', 'slug']: |
|
if col in self.crypto_data.columns: |
|
symbol_col = col |
|
break |
|
|
|
if symbol_col is None: |
|
logger.error("No symbol column found in crypto data") |
|
logger.info(f"Available columns: {list(self.crypto_data.columns)}") |
|
return False |
|
|
|
|
|
if symbol_col != 'symbol': |
|
self.crypto_data['symbol'] = self.crypto_data[symbol_col] |
|
self.crypto_data.drop(columns=[symbol_col], inplace=True) |
|
|
|
self.crypto_data['symbol'] = self.crypto_data['symbol'].apply(self.normalize_symbol) |
|
|
|
self.stats['crypto_records'] = len(self.crypto_data) |
|
self.stats['crypto_features'] = len([col for col in self.crypto_data.columns if col != 'symbol']) |
|
|
|
logger.info(f"Loaded crypto data: {len(self.crypto_data)} records, {len(self.crypto_data.columns)} columns") |
|
logger.info(f"Crypto symbols: {sorted(self.crypto_data['symbol'].unique())}") |
|
logger.info(f"Crypto date range: {self.crypto_data['interval_timestamp'].min()} to {self.crypto_data['interval_timestamp'].max()}") |
|
|
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to load crypto data: {e}") |
|
return False |
|
|
|
def apply_time_shift_merge(self, crypto_df, santiment_df, symbol): |
|
""" |
|
Apply time-shifted merge for a specific symbol using day-of-week matching |
|
This function preserves ALL crypto records and adds Santiment features where possible |
|
|
|
Args: |
|
crypto_df: Crypto data for one symbol |
|
santiment_df: Santiment data for one symbol |
|
symbol: Symbol being processed |
|
|
|
Returns: |
|
Merged DataFrame with ALL crypto records plus Santiment features |
|
""" |
|
logger.info(f" Time-shift merging {len(crypto_df)} crypto records for {symbol}") |
|
|
|
|
|
result_df = crypto_df.copy() |
|
|
|
|
|
for col in santiment_df.columns: |
|
if col != 'symbol': |
|
result_df[col] = np.nan |
|
|
|
|
|
for crypto_idx, crypto_row in crypto_df.iterrows(): |
|
|
|
crypto_timestamp_ms = crypto_row['interval_timestamp'] |
|
crypto_time = pd.to_datetime(crypto_timestamp_ms, unit='ms', utc=True) |
|
|
|
|
|
santiment_same_weekday = santiment_df[ |
|
santiment_df.index.dayofweek == crypto_time.dayofweek |
|
] |
|
|
|
if not santiment_same_weekday.empty: |
|
|
|
crypto_time_of_day = crypto_time.time() |
|
|
|
time_diffs = santiment_same_weekday.index.map( |
|
lambda x: abs((x.time().hour * 60 + x.time().minute) - |
|
(crypto_time_of_day.hour * 60 + crypto_time_of_day.minute)) |
|
) |
|
|
|
closest_idx = time_diffs.argmin() |
|
closest_idx = santiment_same_weekday.index[closest_idx] |
|
santiment_row = santiment_same_weekday.loc[closest_idx] |
|
|
|
|
|
for col in santiment_df.columns: |
|
if col != 'symbol': |
|
result_df.loc[crypto_idx, col] = santiment_row[col] |
|
|
|
logger.info(f" Preserved all {len(result_df)} crypto records for {symbol}") |
|
|
|
|
|
santiment_cols = [col for col in santiment_df.columns if col != 'symbol'] |
|
if santiment_cols: |
|
non_null_count = result_df[santiment_cols[0]].notna().sum() |
|
logger.info(f" Added Santiment features to {non_null_count}/{len(result_df)} records ({non_null_count/len(result_df)*100:.1f}%)") |
|
|
|
return result_df |
|
def merge_datasets(self) -> bool: |
|
""" |
|
Merge Santiment and crypto datasets using time-shift logic |
|
|
|
Returns: |
|
True if successful, False otherwise |
|
""" |
|
try: |
|
if self.santiment_data is None or self.crypto_data is None: |
|
logger.error("Both datasets must be loaded before merging") |
|
return False |
|
|
|
logger.info("Starting time-shifted merge process...") |
|
|
|
|
|
|
|
try: |
|
crypto_timestamps = pd.to_datetime(self.crypto_data['interval_timestamp'], unit='ms', utc=True) |
|
crypto_start, crypto_end = crypto_timestamps.min(), crypto_timestamps.max() |
|
sant_start, sant_end = self.santiment_data.index.min(), self.santiment_data.index.max() |
|
|
|
logger.info(f"Crypto date range: {crypto_start} to {crypto_end}") |
|
logger.info(f"Santiment date range: {sant_start} to {sant_end}") |
|
except Exception as e: |
|
logger.warning(f"Could not calculate date ranges for comparison: {e}") |
|
|
|
crypto_start = crypto_end = None |
|
sant_start, sant_end = self.santiment_data.index.min(), self.santiment_data.index.max() |
|
logger.info(f"Santiment date range: {sant_start} to {sant_end}") |
|
|
|
|
|
if crypto_start and crypto_end: |
|
overlap = (crypto_start <= sant_end) and (sant_start <= crypto_end) |
|
if not overlap: |
|
logger.warning("No date overlap detected - using time-shift merge strategy") |
|
else: |
|
logger.warning("Using time-shift merge strategy (date comparison skipped)") |
|
|
|
|
|
santiment_symbols = set(self.santiment_data['symbol'].unique()) |
|
crypto_symbols = set(self.crypto_data['symbol'].unique()) |
|
common_symbols = santiment_symbols & crypto_symbols |
|
|
|
self.stats['common_symbols'] = len(common_symbols) |
|
|
|
logger.info(f"Common symbols found: {len(common_symbols)} - {sorted(common_symbols)}") |
|
|
|
if not common_symbols: |
|
logger.error("No common symbols found between datasets") |
|
|
|
logger.info("Falling back to crypto-only merged output with empty Santiment features") |
|
crypto_only = self.crypto_data.copy() |
|
|
|
sant_cols = [] |
|
if self.santiment_data is not None: |
|
sant_cols = [col for col in self.santiment_data.columns if col != 'symbol'] |
|
|
|
for col in sant_cols: |
|
crypto_only[col] = np.nan |
|
|
|
self.merged_data = crypto_only.reset_index(drop=True) |
|
self.stats['merged_records'] = len(self.merged_data) |
|
self.stats['total_features'] = len([c for c in self.merged_data.columns if c != 'symbol']) |
|
start_time = pd.to_datetime(self.merged_data['interval_timestamp'].min(), unit='ms', utc=True) |
|
end_time = pd.to_datetime(self.merged_data['interval_timestamp'].max(), unit='ms', utc=True) |
|
self.stats['time_range'] = { |
|
'start': str(start_time), |
|
'end': str(end_time), |
|
'total_days': (end_time - start_time).days |
|
} |
|
return True |
|
|
|
|
|
merged_parts = [] |
|
total_merged_records = 0 |
|
|
|
for symbol in common_symbols: |
|
logger.info(f"Processing {symbol} with time-shift merge...") |
|
|
|
sant_symbol = self.santiment_data[self.santiment_data['symbol'] == symbol].copy() |
|
crypto_symbol = self.crypto_data[self.crypto_data['symbol'] == symbol].copy() |
|
|
|
if crypto_symbol.empty: |
|
logger.warning(f"Skipping {symbol} - no crypto data") |
|
continue |
|
|
|
if sant_symbol.empty: |
|
logger.warning(f"No Santiment data for {symbol} - adding with null Santiment features") |
|
|
|
sant_cols = [col for col in self.santiment_data.columns if col != 'symbol'] |
|
for col in sant_cols: |
|
crypto_symbol[col] = np.nan |
|
|
|
crypto_symbol = crypto_symbol.reset_index(drop=True) |
|
merged_parts.append(crypto_symbol) |
|
total_merged_records += len(crypto_symbol) |
|
else: |
|
|
|
merged_symbol = self.apply_time_shift_merge(crypto_symbol, sant_symbol, symbol) |
|
|
|
merged_symbol = merged_symbol.reset_index(drop=True) |
|
merged_parts.append(merged_symbol) |
|
total_merged_records += len(merged_symbol) |
|
|
|
logger.info(f" Processed {len(crypto_symbol)} crypto records for {symbol}") |
|
|
|
|
|
crypto_only_symbols = crypto_symbols - common_symbols |
|
for symbol in crypto_only_symbols: |
|
logger.info(f"Adding crypto-only symbol: {symbol}") |
|
crypto_only = self.crypto_data[self.crypto_data['symbol'] == symbol].copy() |
|
|
|
|
|
sant_cols = [col for col in self.santiment_data.columns if col != 'symbol'] |
|
for col in sant_cols: |
|
crypto_only[col] = np.nan |
|
|
|
|
|
crypto_only = crypto_only.reset_index(drop=True) |
|
merged_parts.append(crypto_only) |
|
total_merged_records += len(crypto_only) |
|
|
|
|
|
if merged_parts: |
|
self.merged_data = pd.concat(merged_parts, axis=0, ignore_index=True) |
|
|
|
self.merged_data = self.merged_data.sort_values('interval_timestamp') |
|
|
|
self.stats['merged_records'] = len(self.merged_data) |
|
self.stats['total_features'] = len([col for col in self.merged_data.columns if col != 'symbol']) |
|
|
|
|
|
start_time = pd.to_datetime(self.merged_data['interval_timestamp'].min(), unit='ms', utc=True) |
|
end_time = pd.to_datetime(self.merged_data['interval_timestamp'].max(), unit='ms', utc=True) |
|
self.stats['time_range'] = { |
|
'start': str(start_time), |
|
'end': str(end_time), |
|
'total_days': (end_time - start_time).days |
|
} |
|
|
|
logger.info(f"Total crypto records processed: {total_merged_records}") |
|
logger.info("Time-shifted merge completed successfully!") |
|
return True |
|
else: |
|
logger.error("No data to merge") |
|
return False |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to merge datasets: {e}") |
|
return False |
|
|
|
def save_merged_data(self) -> bool: |
|
""" |
|
Save the merged dataset, backing up the original crypto file |
|
|
|
Returns: |
|
True if successful, False otherwise |
|
""" |
|
try: |
|
if self.merged_data is None or self.merged_data.empty: |
|
logger.error("No merged data to save") |
|
return False |
|
|
|
|
|
if self.crypto_file != self.output_file and self.crypto_file.exists(): |
|
backup_file = self.crypto_file.with_suffix('.backup.parquet') |
|
import shutil |
|
shutil.copy2(self.crypto_file, backup_file) |
|
logger.info(f"Backed up original crypto file to: {backup_file}") |
|
|
|
logger.info(f"Saving merged data to {self.output_file}") |
|
|
|
|
|
|
|
self.merged_data.to_parquet(self.output_file, index=False, compression='snappy') |
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"Merged data saved successfully!") |
|
logger.info(f"Enhanced crypto file: {self.output_file}") |
|
|
|
|
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to save merged data: {e}") |
|
return False |
|
|
|
def print_summary(self): |
|
"""Print merge summary""" |
|
print("\n" + "="*70) |
|
print("SANTIMENT-CRYPTO MERGER SUMMARY") |
|
print("="*70) |
|
|
|
print(f"\nInput Data:") |
|
print(f" Santiment records: {self.stats['santiment_records']:,}") |
|
print(f" Santiment features: {self.stats['santiment_features']}") |
|
print(f" Crypto records: {self.stats['crypto_records']:,}") |
|
print(f" Crypto features: {self.stats['crypto_features']}") |
|
|
|
print(f"\nMerge Results:") |
|
print(f" Common symbols: {self.stats['common_symbols']}") |
|
print(f" Final records: {self.stats['merged_records']:,}") |
|
print(f" Total features: {self.stats['total_features']}") |
|
|
|
if self.stats['time_range']: |
|
print(f"\nTime Range:") |
|
print(f" Start: {self.stats['time_range']['start']}") |
|
print(f" End: {self.stats['time_range']['end']}") |
|
print(f" Total days: {self.stats['time_range']['total_days']}") |
|
|
|
if self.merged_data is not None: |
|
print(f"\nFinal Dataset:") |
|
print(f" Memory usage: {self.merged_data.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB") |
|
print(f" Null percentage: {(self.merged_data.isnull().sum().sum() / (len(self.merged_data) * len(self.merged_data.columns))) * 100:.2f}%") |
|
|
|
|
|
symbol_dist = self.merged_data['symbol'].value_counts() |
|
print(f"\nSymbol Distribution:") |
|
for symbol, count in symbol_dist.head(10).items(): |
|
print(f" {symbol}: {count:,} records") |
|
|
|
print("="*70) |
|
|
|
def run_merge(self) -> bool: |
|
""" |
|
Run the complete merge process |
|
|
|
Returns: |
|
True if successful, False otherwise |
|
""" |
|
try: |
|
logger.info("Starting Santiment-Crypto merge process...") |
|
|
|
|
|
sant_ok = self.load_santiment_data() |
|
crypto_ok = self.load_crypto_data() |
|
|
|
if not crypto_ok: |
|
return False |
|
if not sant_ok: |
|
logger.warning("Proceeding without Santiment data; emitting crypto-only output") |
|
self.merged_data = self.crypto_data.copy() |
|
|
|
if not self.save_merged_data(): |
|
return False |
|
self.print_summary() |
|
logger.info("Santiment-Crypto merge completed successfully with crypto-only output") |
|
return True |
|
|
|
|
|
if not self.merge_datasets(): |
|
return False |
|
|
|
|
|
if not self.save_merged_data(): |
|
return False |
|
|
|
|
|
self.print_summary() |
|
|
|
logger.info("Santiment-Crypto merge completed successfully!") |
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"Merge process failed: {e}") |
|
return False |
|
|
|
|
|
def main(): |
|
"""Main function""" |
|
merger = SantimentCryptoMerger( |
|
santiment_file="data/santiment/merged_features.parquet", |
|
|
|
output_file="data/merged/features/crypto_features.parquet", |
|
time_tolerance_hours=1 |
|
) |
|
|
|
success = merger.run_merge() |
|
return success |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|