advisorai-data-enhanced / src /merge /merge_santiment_time_shifted.py
Maaroufabousaleh
f
c49b21b
raw
history blame
10.7 kB
#!/usr/bin/env python3
"""
Time-Shifted Santiment-Crypto Merger
===================================
This script handles the case where Santiment data and crypto data have different date ranges
due to API limitations. It performs a time-shifted merge using pattern matching.
Approaches:
1. Offset-based: Map August crypto data to July Santiment data with consistent offset
2. Day-of-week matching: Match same weekdays/times across different months
3. Pattern-based: Use similar market patterns from different time periods
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os
import logging
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def load_data():
"""Load crypto and Santiment data"""
logger.info("Loading data files...")
# Load crypto features
crypto_file = 'data/merged/features/crypto_features.parquet'
crypto_df = pd.read_parquet(crypto_file)
crypto_df['datetime'] = pd.to_datetime(crypto_df['interval_timestamp'], unit='ms', utc=True)
# Load Santiment features
santiment_file = 'data/santiment/merged_features.parquet'
santiment_df = pd.read_parquet(santiment_file)
logger.info(f"Crypto: {len(crypto_df)} records from {crypto_df['datetime'].min()} to {crypto_df['datetime'].max()}")
logger.info(f"Santiment: {len(santiment_df)} records from {santiment_df.index.min()} to {santiment_df.index.max()}")
return crypto_df, santiment_df
def calculate_time_offset(crypto_df, santiment_df):
"""Calculate the time offset between datasets"""
crypto_start = crypto_df['datetime'].min()
santiment_start = santiment_df.index.min()
offset = crypto_start - santiment_start
logger.info(f"Time offset: {offset.days} days")
return offset
def merge_with_time_shift(crypto_df, santiment_df, method='offset'):
"""
Merge crypto and Santiment data using time-shift techniques
Args:
crypto_df: Crypto features DataFrame
santiment_df: Santiment features DataFrame
method: 'offset', 'day_of_week', or 'pattern'
"""
logger.info(f"Starting time-shifted merge using method: {method}")
merged_results = []
symbol_mapping = {'BTC': 'BTC', 'ETH': 'ETH', 'ADA': 'ADA', 'SOL': 'SOL', 'XRP': 'XRP'}
if method == 'offset':
# Calculate consistent time offset
offset = calculate_time_offset(crypto_df, santiment_df)
for symbol, slug in symbol_mapping.items():
logger.info(f"Processing {symbol} β†’ {slug} with offset method")
crypto_symbol = crypto_df[crypto_df['symbol'] == symbol].copy()
santiment_slug = santiment_df[santiment_df['slug'] == slug].copy()
if crypto_symbol.empty or santiment_slug.empty:
logger.warning(f"Skipping {symbol} - missing data")
continue
# Apply offset to match timeframes
merged_symbol = merge_with_offset(crypto_symbol, santiment_slug, offset)
merged_results.append(merged_symbol)
elif method == 'day_of_week':
# Match same day-of-week and time patterns
for symbol, slug in symbol_mapping.items():
logger.info(f"Processing {symbol} β†’ {slug} with day-of-week method")
crypto_symbol = crypto_df[crypto_df['symbol'] == symbol].copy()
santiment_slug = santiment_df[santiment_df['slug'] == slug].copy()
if crypto_symbol.empty or santiment_slug.empty:
logger.warning(f"Skipping {symbol} - missing data")
continue
merged_symbol = merge_by_day_pattern(crypto_symbol, santiment_slug)
merged_results.append(merged_symbol)
# Combine results
if merged_results:
merged_df = pd.concat(merged_results, ignore_index=True)
logger.info(f"Merge completed: {len(merged_df)} records")
return merged_df
else:
logger.error("No data could be merged!")
return None
def merge_with_offset(crypto_symbol, santiment_slug, offset):
"""Merge using consistent time offset"""
merged_records = []
for _, crypto_row in crypto_symbol.iterrows():
# Shift crypto timestamp back by offset to match Santiment timeframe
shifted_time = crypto_row['datetime'] - offset
# Find closest Santiment record
time_diffs = np.abs(santiment_slug.index - shifted_time)
closest_idx = time_diffs.argmin()
closest_idx = santiment_slug.index[closest_idx]
# Check if match is reasonable (within 1 hour)
if time_diffs.min() <= pd.Timedelta(hours=1):
santiment_row = santiment_slug.loc[closest_idx]
# Combine data
combined_row = crypto_row.copy()
for col in santiment_slug.columns:
if col != 'slug':
combined_row[f'santiment_{col}'] = santiment_row[col]
merged_records.append(combined_row)
return pd.DataFrame(merged_records)
def merge_by_day_pattern(crypto_symbol, santiment_slug):
"""Merge by matching day-of-week and time patterns"""
merged_records = []
for _, crypto_row in crypto_symbol.iterrows():
crypto_time = crypto_row['datetime']
# Find Santiment records with same day-of-week and similar time
santiment_same_weekday = santiment_slug[
santiment_slug.index.dayofweek == crypto_time.dayofweek
]
if not santiment_same_weekday.empty:
# Find closest time-of-day match
crypto_time_of_day = crypto_time.time()
time_diffs = santiment_same_weekday.index.map(
lambda x: abs((x.time().hour * 60 + x.time().minute) -
(crypto_time_of_day.hour * 60 + crypto_time_of_day.minute))
)
closest_idx = time_diffs.argmin()
closest_idx = santiment_same_weekday.index[closest_idx]
santiment_row = santiment_same_weekday.loc[closest_idx]
# Combine data
combined_row = crypto_row.copy()
for col in santiment_slug.columns:
if col != 'slug':
combined_row[f'santiment_{col}'] = santiment_row[col]
merged_records.append(combined_row)
return pd.DataFrame(merged_records)
def analyze_merge_quality(merged_df, method):
"""Analyze merge quality and provide statistics"""
if merged_df is None or merged_df.empty:
return {"error": "No merged data"}
santiment_cols = [col for col in merged_df.columns if col.startswith('santiment_')]
analysis = {
'method_used': method,
'total_records': len(merged_df),
'santiment_features_added': len(santiment_cols),
'symbols_processed': sorted(merged_df['symbol'].unique()),
'completeness_by_symbol': {}
}
# Calculate completeness by symbol
for symbol in analysis['symbols_processed']:
symbol_data = merged_df[merged_df['symbol'] == symbol]
non_null_counts = symbol_data[santiment_cols].notna().sum(axis=1)
records_with_santiment = (non_null_counts > 0).sum()
analysis['completeness_by_symbol'][symbol] = {
'total_records': len(symbol_data),
'records_with_santiment': records_with_santiment,
'completeness_pct': records_with_santiment / len(symbol_data) * 100
}
return analysis
def save_results(merged_df, analysis, method):
"""Save merged results with method identifier"""
if merged_df is None:
logger.error("Cannot save - no merged data")
return None, None
logger.info("Saving time-shifted merge results...")
# Create output directory
output_dir = 'data/merged/features'
os.makedirs(output_dir, exist_ok=True)
# Save with method identifier
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = os.path.join(output_dir, f'crypto_with_santiment_{method}_{timestamp}.parquet')
merged_df.to_parquet(output_file, index=False)
logger.info(f"Merged features saved to: {output_file}")
# Save analysis
analysis_file = os.path.join(output_dir, f'santiment_merge_analysis_{method}_{timestamp}.json')
import json
with open(analysis_file, 'w') as f:
json.dump(analysis, f, indent=2, default=str)
logger.info(f"Analysis saved to: {analysis_file}")
return output_file, analysis_file
def main():
"""Main time-shifted merge process"""
logger.info("Starting time-shifted Santiment-Crypto merge...")
try:
# Load data
crypto_df, santiment_df = load_data()
# Try different merge methods
methods = ['offset', 'day_of_week']
results = {}
for method in methods:
logger.info(f"\n{'='*50}")
logger.info(f"TRYING METHOD: {method.upper()}")
logger.info(f"{'='*50}")
merged_df = merge_with_time_shift(crypto_df, santiment_df, method=method)
analysis = analyze_merge_quality(merged_df, method)
if merged_df is not None:
output_file, analysis_file = save_results(merged_df, analysis, method)
results[method] = {
'success': True,
'records': len(merged_df),
'completeness': analysis.get('completeness_by_symbol', {}),
'output_file': output_file
}
else:
results[method] = {'success': False}
# Print summary
print("\n" + "="*60)
print("TIME-SHIFTED MERGE SUMMARY")
print("="*60)
for method, result in results.items():
print(f"\n{method.upper()} METHOD:")
if result['success']:
print(f" βœ… Success: {result['records']} records merged")
print(f" πŸ“ File: {result['output_file']}")
for symbol, stats in result['completeness'].items():
print(f" {symbol}: {stats['completeness_pct']:.1f}% complete")
else:
print(f" ❌ Failed")
print("="*60)
except Exception as e:
logger.error(f"Time-shifted merge failed: {e}")
raise
if __name__ == "__main__":
main()