advisorai-data-enhanced / src /merge /merge_santiment_to_crypto.py
Maaroufabousaleh
f
c49b21b
raw
history blame
14.5 kB
#!/usr/bin/env python3
"""
Merge Santiment Features with Crypto Features
============================================
This script merges Santiment data with existing crypto features by matching:
- symbol (crypto) = slug (santiment)
- interval_timestamp (crypto) = datetime (santiment) with ±1 hour tolerance
The result includes all original crypto features plus all Santiment features.
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os
from pathlib import Path
import logging
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Resolve data directory base
try:
from src.config import DATA_DIR as CFG_DATA_DIR
except Exception:
try:
from config import DATA_DIR as CFG_DATA_DIR
except Exception:
CFG_DATA_DIR = "/data"
def _resolve_under_data(path_like: str | os.PathLike) -> Path:
p = Path(path_like)
if p.is_absolute():
return p
parts = p.parts
if parts and parts[0].lower() == "data":
rel = Path(*parts[1:]) if len(parts) > 1 else Path()
else:
rel = p
return Path(CFG_DATA_DIR) / rel
def convert_timestamp_to_datetime(timestamp_ms):
"""
Convert millisecond timestamp to datetime
Args:
timestamp_ms: Timestamp in milliseconds
Returns:
Datetime object
"""
return pd.to_datetime(timestamp_ms, unit='ms', utc=True)
def normalize_symbol_mapping():
"""
Create symbol mapping between crypto symbols and Santiment slugs
Returns:
Dictionary mapping crypto symbols to Santiment slugs
"""
# Both crypto and Santiment use the same symbol names
return {
'BTC': 'BTC',
'ETH': 'ETH',
'ADA': 'ADA',
'SOL': 'SOL',
'XRP': 'XRP'
}
def load_data():
"""
Load crypto features and Santiment features
Returns:
Tuple of (crypto_df, santiment_df)
"""
logger.info("Loading data files...")
# Load crypto features
crypto_file = _resolve_under_data('data/merged/features/crypto_features.parquet')
if not os.path.exists(crypto_file):
raise FileNotFoundError(f"Crypto features file not found: {crypto_file}")
crypto_df = pd.read_parquet(crypto_file)
logger.info(f"Loaded crypto features: {crypto_df.shape[0]} rows, {crypto_df.shape[1]} columns")
# Load Santiment features
santiment_file = _resolve_under_data('data/santiment/merged_features.parquet')
if not os.path.exists(santiment_file):
logger.warning(f"Santiment features file not found: {santiment_file}")
logger.warning("Proceeding without Santiment features (crypto-only output)")
return crypto_df, None
santiment_df = pd.read_parquet(santiment_file)
logger.info(f"Loaded Santiment features: {santiment_df.shape[0]} rows, {santiment_df.shape[1]} columns")
return crypto_df, santiment_df
def prepare_crypto_data(crypto_df):
"""
Prepare crypto data for merging
Args:
crypto_df: Crypto features DataFrame
Returns:
Prepared crypto DataFrame
"""
logger.info("Preparing crypto data...")
# Convert interval_timestamp to datetime
crypto_df = crypto_df.copy()
crypto_df['datetime'] = convert_timestamp_to_datetime(crypto_df['interval_timestamp'])
# Set datetime as index for easier merging
crypto_df.set_index('datetime', inplace=True)
logger.info(f"Crypto date range: {crypto_df.index.min()} to {crypto_df.index.max()}")
logger.info(f"Crypto symbols: {sorted(crypto_df['symbol'].unique())}")
return crypto_df
def prepare_santiment_data(santiment_df):
"""
Prepare Santiment data for merging
Args:
santiment_df: Santiment features DataFrame
Returns:
Prepared Santiment DataFrame
"""
logger.info("Preparing Santiment data...")
santiment_df = santiment_df.copy()
# Ensure datetime index is timezone-aware (convert to UTC if needed)
if santiment_df.index.tz is None:
santiment_df.index = pd.to_datetime(santiment_df.index, utc=True)
elif str(santiment_df.index.tz) != 'UTC':
santiment_df.index = santiment_df.index.tz_convert('UTC')
logger.info(f"Santiment date range: {santiment_df.index.min()} to {santiment_df.index.max()}")
logger.info(f"Santiment slugs: {sorted(santiment_df['slug'].unique())}")
return santiment_df
def merge_with_tolerance(crypto_df, santiment_df, symbol_mapping, tolerance_hours=1):
"""
Merge crypto and Santiment data with time tolerance
Args:
crypto_df: Prepared crypto DataFrame
santiment_df: Prepared Santiment DataFrame
symbol_mapping: Dict mapping crypto symbols to Santiment slugs
tolerance_hours: Time tolerance in hours for matching
Returns:
Merged DataFrame
"""
logger.info(f"Starting merge with ±{tolerance_hours} hour tolerance...")
merged_results = []
tolerance = pd.Timedelta(hours=tolerance_hours)
# Track merge statistics
total_crypto_records = len(crypto_df)
successful_matches = 0
for symbol, slug in symbol_mapping.items():
logger.info(f"Processing {symbol}{slug}")
# Filter data for current symbol/slug
crypto_symbol = crypto_df[crypto_df['symbol'] == symbol].copy()
santiment_slug = santiment_df[santiment_df['slug'] == slug].copy()
if crypto_symbol.empty:
logger.warning(f"No crypto data found for symbol: {symbol}")
continue
if santiment_slug.empty:
logger.warning(f"No Santiment data found for slug: {slug}")
# Add crypto data with null Santiment features
crypto_symbol_with_nulls = add_null_santiment_features(crypto_symbol, santiment_df.columns)
merged_results.append(crypto_symbol_with_nulls)
continue
# Perform time-tolerance merge
merged_symbol = merge_by_time_tolerance(crypto_symbol, santiment_slug, tolerance)
merged_results.append(merged_symbol)
matches = len(merged_symbol)
successful_matches += matches
logger.info(f" Matched {matches}/{len(crypto_symbol)} records for {symbol}")
# Combine all results
if merged_results:
merged_df = pd.concat(merged_results, ignore_index=False)
logger.info(f"Merge completed: {successful_matches}/{total_crypto_records} records matched ({successful_matches/total_crypto_records*100:.1f}%)")
else:
logger.error("No data could be merged!")
return None
return merged_df
def merge_by_time_tolerance(crypto_symbol, santiment_slug, tolerance):
"""
Merge crypto and Santiment data for a single symbol with time tolerance
Args:
crypto_symbol: Crypto data for one symbol
santiment_slug: Santiment data for one slug
tolerance: Time tolerance as Timedelta
Returns:
Merged DataFrame for this symbol
"""
merged_records = []
for crypto_time, crypto_row in crypto_symbol.iterrows():
# Find Santiment records within tolerance
time_diff = np.abs(santiment_slug.index - crypto_time)
within_tolerance = time_diff <= tolerance
if within_tolerance.any():
# Get the closest match within tolerance
closest_idx = time_diff[within_tolerance].idxmin()
santiment_row = santiment_slug.loc[closest_idx]
# Combine crypto and Santiment features
combined_row = crypto_row.copy()
# Add Santiment features (excluding 'slug' to avoid duplication)
for col in santiment_slug.columns:
if col != 'slug': # Don't overwrite symbol with slug
combined_row[f'santiment_{col}'] = santiment_row[col]
merged_records.append(combined_row)
else:
# No match found - add with null Santiment features
combined_row = crypto_row.copy()
for col in santiment_slug.columns:
if col != 'slug':
combined_row[f'santiment_{col}'] = np.nan
merged_records.append(combined_row)
return pd.DataFrame(merged_records, index=crypto_symbol.index)
def add_null_santiment_features(crypto_df, santiment_columns):
"""
Add null Santiment features to crypto data when no Santiment data exists
Args:
crypto_df: Crypto DataFrame
santiment_columns: Santiment column names
Returns:
Crypto DataFrame with null Santiment features
"""
crypto_with_nulls = crypto_df.copy()
for col in santiment_columns:
if col != 'slug': # Don't add slug column
crypto_with_nulls[f'santiment_{col}'] = np.nan
return crypto_with_nulls
def analyze_merge_quality(merged_df):
"""
Analyze the quality of the merge
Args:
merged_df: Merged DataFrame
Returns:
Dictionary with merge quality metrics
"""
logger.info("Analyzing merge quality...")
# Count Santiment features (exclude slug)
santiment_cols = [col for col in merged_df.columns if col.startswith('santiment_')]
analysis = {
'total_records': len(merged_df),
'santiment_features_added': len(santiment_cols),
'symbols_processed': sorted(merged_df['symbol'].unique()),
'completeness_by_symbol': {},
'overall_completeness': 0.0
}
# Analyze completeness by symbol
for symbol in analysis['symbols_processed']:
symbol_data = merged_df[merged_df['symbol'] == symbol]
# Calculate how many records have non-null Santiment data
non_null_counts = symbol_data[santiment_cols].notna().sum(axis=1)
records_with_santiment = (non_null_counts > 0).sum()
completeness = records_with_santiment / len(symbol_data) * 100
analysis['completeness_by_symbol'][symbol] = {
'total_records': len(symbol_data),
'records_with_santiment': records_with_santiment,
'completeness_pct': completeness
}
# Overall completeness
all_santiment_data = merged_df[santiment_cols].notna().sum(axis=1)
records_with_any_santiment = (all_santiment_data > 0).sum()
analysis['overall_completeness'] = records_with_any_santiment / len(merged_df) * 100
return analysis
def save_results(merged_df, analysis):
"""
Save merged results and analysis
Args:
merged_df: Merged DataFrame
analysis: Merge quality analysis
"""
logger.info("Saving results...")
# Create output directory
output_dir = 'data/merged/features'
os.makedirs(output_dir, exist_ok=True)
# Save merged features
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = os.path.join(output_dir, f'crypto_with_santiment_features_{timestamp}.parquet')
# Reset index to include datetime as column
merged_df_export = merged_df.reset_index()
merged_df_export.to_parquet(output_file, index=False)
logger.info(f"Merged features saved to: {output_file}")
# Save analysis report
analysis_file = os.path.join(output_dir, f'santiment_merge_analysis_{timestamp}.json')
import json
with open(analysis_file, 'w') as f:
json.dump(analysis, f, indent=2, default=str)
logger.info(f"Analysis saved to: {analysis_file}")
return output_file, analysis_file
def main():
"""
Main merge process
"""
logger.info("Starting Santiment-Crypto merge process...")
try:
# Load data
crypto_df, santiment_df = load_data()
# Prepare data
crypto_prepared = prepare_crypto_data(crypto_df)
if santiment_df is None:
logger.warning("No Santiment data available; exporting crypto-only dataset")
# Export crypto-only with datetime included
output_dir = 'data/merged/features'
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = os.path.join(output_dir, f'crypto_with_santiment_features_{timestamp}.parquet')
crypto_prepared.reset_index().to_parquet(output_file, index=False)
logger.info(f"Crypto-only features saved to: {output_file}")
return
santiment_prepared = prepare_santiment_data(santiment_df)
# Define symbol mapping
symbol_mapping = normalize_symbol_mapping()
logger.info(f"Symbol mapping: {symbol_mapping}")
# Perform merge
merged_df = merge_with_tolerance(
crypto_prepared,
santiment_prepared,
symbol_mapping,
tolerance_hours=1
)
if merged_df is None:
logger.error("Merge failed!")
return
# Analyze results
analysis = analyze_merge_quality(merged_df)
# Print summary
print("\n" + "="*60)
print("SANTIMENT-CRYPTO MERGE SUMMARY")
print("="*60)
print(f"Total records: {analysis['total_records']}")
print(f"Santiment features added: {analysis['santiment_features_added']}")
print(f"Overall completeness: {analysis['overall_completeness']:.1f}%")
print(f"Symbols processed: {analysis['symbols_processed']}")
print(f"\nCompleteness by symbol:")
for symbol, stats in analysis['completeness_by_symbol'].items():
print(f" {symbol}: {stats['records_with_santiment']}/{stats['total_records']} "
f"({stats['completeness_pct']:.1f}%)")
# Save results
output_file, analysis_file = save_results(merged_df, analysis)
print(f"\nFiles saved:")
print(f" Merged data: {output_file}")
print(f" Analysis: {analysis_file}")
print("="*60)
logger.info("Merge process completed successfully!")
except Exception as e:
logger.error(f"Merge process failed: {e}")
raise
if __name__ == "__main__":
main()