advisorai-data-enhanced / src /merge /crypto_data_filler.py
Maaroufabousaleh
f
c49b21b
raw
history blame
44.4 kB
import pandas as pd
import numpy as np
from sklearn.impute import KNNImputer
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings('ignore')
class CryptoDataImputerFixed:
"""
Specialized imputation for cryptocurrency data that preserves unique
characteristics of different crypto assets and prevents homogenization.
"""
def __init__(self, preserve_crypto_diversity=True):
self.preserve_crypto_diversity = preserve_crypto_diversity
self.crypto_profiles = {}
self.scalers = {}
def _create_crypto_profiles(self, df):
"""Create profiles for each cryptocurrency to guide imputation."""
profiles = {}
for symbol in df['symbol'].unique():
symbol_data = df[df['symbol'] == symbol]
# Calculate crypto-specific statistics
# Defensive mode extraction for 'stable' and 'blockchain_network'
stable_mode = symbol_data['stable'].mode() if 'stable' in symbol_data.columns else pd.Series()
is_stablecoin = stable_mode.iloc[0] if not stable_mode.empty else False
network_mode = symbol_data['blockchain_network'].mode() if 'blockchain_network' in symbol_data.columns else pd.Series()
blockchain_network = network_mode.iloc[0] if not network_mode.empty else None
profile = {
'symbol': symbol,
'price_level': symbol_data['price'].median() if 'price' in symbol_data.columns else None,
'price_volatility': symbol_data['price'].std() if 'price' in symbol_data.columns else None,
'volume_level': symbol_data['volume'].median() if 'volume' in symbol_data.columns else None,
'marketcap_level': symbol_data['marketcap'].median() if 'marketcap' in symbol_data.columns else None,
'dominance_level': symbol_data['dominance'].median() if 'dominance' in symbol_data.columns else None,
'rank': symbol_data['rank'].median() if 'rank' in symbol_data.columns else None,
'is_stablecoin': is_stablecoin,
'typical_rsi': symbol_data['rsi'].median() if 'rsi' in symbol_data.columns else None,
'blockchain_network': blockchain_network,
'has_onchain_data': symbol_data['transaction_count'].notna().any() if 'transaction_count' in symbol_data.columns else False,
'exchange_coverage': len([col for col in symbol_data.columns if col.startswith('symbols.') and symbol_data[col].notna().any()]),
'data_availability': len(symbol_data) / len(df) if len(df) > 0 else 0
}
profiles[symbol] = profile
return profiles
def _impute_with_crypto_context(self, df, column, crypto_profiles):
"""Impute values using crypto-specific context to prevent homogenization."""
df_result = df.copy()
for symbol in df['symbol'].unique():
symbol_mask = df['symbol'] == symbol
symbol_data = df.loc[symbol_mask, column]
if symbol_data.isnull().sum() == 0:
continue # No missing values for this symbol
profile = crypto_profiles.get(symbol, {})
is_stablecoin = profile.get('is_stablecoin', False)
rank = profile.get('rank', 999)
# Strategy depends on column type and crypto characteristics
if column in ['price', 'open', 'high', 'low', 'close']:
# Price data - special handling for stablecoins
if is_stablecoin:
# Stablecoins should stay around $1
base_price = 1.0
symbol_hash = hash(symbol + column) % 1000 / 100000 # Very small variation
adjusted_price = base_price + symbol_hash
else:
# Regular crypto - use interpolation with crypto-specific bounds
interpolated = symbol_data.interpolate(method='linear', limit_direction='both')
# If still missing, use crypto's typical price level with volatility-based noise
if interpolated.isnull().any() and profile.get('price_level'):
base_price = profile['price_level']
volatility = profile.get('price_volatility', base_price * 0.05) # Crypto is more volatile
# Add crypto-specific noise based on rank (higher rank = more volatile)
symbol_hash = hash(symbol) % 1000 / 1000 # 0-1 range
volatility_multiplier = 1 + (rank / 100) # Higher rank = higher volatility
noise_factor = (symbol_hash - 0.5) * 0.2 * volatility_multiplier # More volatile than stocks
adjusted_price = base_price * (1 + noise_factor)
else:
adjusted_price = interpolated
df_result.loc[symbol_mask, column] = symbol_data.fillna(adjusted_price)
elif column in ['volume', 'volume_alpaca']:
# Volume data - crypto volume patterns differ significantly
filled = symbol_data.fillna(method='ffill').fillna(method='bfill')
if filled.isnull().any():
base_volume = profile.get('volume_level', 1000000) # Default higher for crypto
# Major cryptos have much higher volume
if rank and rank <= 10:
volume_multiplier = 5 + (hash(symbol + column) % 1000 / 200) # 5x-10x
elif rank and rank <= 50:
volume_multiplier = 1 + (hash(symbol + column) % 1000 / 500) # 1x-3x
else:
volume_multiplier = 0.1 + (hash(symbol + column) % 1000 / 1000) # 0.1x-1.1x
adjusted_volume = base_volume * volume_multiplier
filled = filled.fillna(adjusted_volume)
df_result.loc[symbol_mask, column] = filled
elif column in ['marketcap']:
# Market cap - highly dependent on rank
if profile.get('marketcap_level'):
baseline = profile['marketcap_level']
else:
# Estimate based on rank
if rank and rank <= 10:
baseline = 10_000_000_000 # $10B+ for top 10
elif rank and rank <= 50:
baseline = 1_000_000_000 # $1B+ for top 50
elif rank and rank <= 100:
baseline = 100_000_000 # $100M+ for top 100
else:
baseline = 10_000_000 # $10M+ for others
# Add symbol-specific variation
symbol_hash = hash(symbol + column) % 1000 / 1000
baseline *= (0.5 + symbol_hash) # 0.5x to 1.5x variation
df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline)
elif column in ['dominance']:
# Market dominance - only meaningful for major cryptos
if rank and rank <= 5:
# Major cryptos have meaningful dominance
symbol_hash = hash(symbol + column) % 1000 / 1000
if symbol.upper() == 'BTC':
baseline = 0.4 + (symbol_hash * 0.2) # BTC: 40-60%
elif symbol.upper() == 'ETH':
baseline = 0.15 + (symbol_hash * 0.1) # ETH: 15-25%
else:
baseline = 0.01 + (symbol_hash * 0.05) # Others: 1-6%
else:
baseline = 0.001 + (hash(symbol + column) % 1000 / 100000) # Very small
df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline)
elif column in ['rsi', 'stoch_k', 'stoch_d']:
# Oscillator indicators - crypto markets are more extreme
symbol_median = symbol_data.median()
if pd.isna(symbol_median):
symbol_hash = hash(symbol + column) % 1000 / 1000
if column == 'rsi':
# Crypto RSI tends to be more extreme
if rank and rank <= 10: # Major cryptos more stable
baseline = 20 + (symbol_hash * 60) # 20-80 range
else: # Alt coins more extreme
baseline = 10 + (symbol_hash * 80) # 10-90 range
else: # stochastic
baseline = 10 + (symbol_hash * 80) # 10-90 range
else:
baseline = symbol_median
df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline)
elif column in ['macd', 'macd_signal', 'macd_histogram']:
# MACD - crypto MACD values tend to be more volatile
symbol_median = symbol_data.median()
if pd.isna(symbol_median):
price_level = profile.get('price_level', 1)
symbol_hash = hash(symbol + column) % 2000 / 1000 - 1 # -1 to +1
# Scale MACD relative to price level and volatility
volatility_factor = 2 if rank and rank > 50 else 1 # Alt coins more volatile
baseline = (price_level * 0.01 * volatility_factor) * symbol_hash
else:
baseline = symbol_median
df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline)
elif column.startswith('performance.'):
# Performance metrics - crypto performance is more extreme
symbol_median = symbol_data.median()
if pd.isna(symbol_median):
symbol_hash = hash(symbol + column) % 2000 / 1000 - 1 # -1 to +1
# Different baselines for different timeframes
if 'year' in column:
baseline = symbol_hash * 5 # ±500% annual performance possible
elif 'month' in column:
baseline = symbol_hash * 2 # ±200% monthly performance possible
elif 'week' in column:
baseline = symbol_hash * 0.5 # ±50% weekly performance possible
elif 'day' in column:
baseline = symbol_hash * 0.2 # ±20% daily performance possible
else: # hour, min
baseline = symbol_hash * 0.05 # ±5% short-term performance
# Alt coins are more volatile
if rank and rank > 50:
baseline *= 2
else:
baseline = symbol_median
df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline)
elif column.startswith('tx_') or column.startswith('gas_') or column in [
'transaction_volume', 'transaction_count', 'total_fees', 'total_gas_used',
'avg_gas_price', 'avg_tx_size', 'fees_7d_change', 'gas_used_7d_change', 'gas_price_7d_change'
] or '_7d_change' in column:
# On-chain features - only meaningful for blockchains with transaction data
network = profile.get('blockchain_network', 'unknown')
# Special handling for 7d change columns
if '7d_change' in column:
# These are percentage changes, should be reasonable values
symbol_hash = hash(symbol + column) % 2000 / 1000 - 1 # -1 to +1 range
if 'fees' in column.lower():
# Fee changes can be more volatile in crypto
baseline = symbol_hash * 0.5 # ±50% change
elif 'gas' in column.lower():
# Gas usage changes
baseline = symbol_hash * 0.3 # ±30% change
else:
# Other transaction-related changes
baseline = symbol_hash * 0.4 # ±40% change
# Alt coins more volatile
if rank and rank > 100:
baseline *= 2
elif network in ['ethereum', 'bitcoin', 'polygon', 'bsc', 'avalanche']:
# Major networks have meaningful on-chain data
symbol_median = symbol_data.median()
if pd.isna(symbol_median):
# Estimate based on network and rank
symbol_hash = hash(symbol + column) % 1000 / 1000
if 'count' in column.lower():
if network == 'ethereum':
baseline = 1000000 * (1 + symbol_hash) # High transaction count
elif network == 'bitcoin':
baseline = 300000 * (1 + symbol_hash) # Lower transaction count
else:
baseline = 500000 * (1 + symbol_hash) # Medium transaction count
elif 'gas' in column.lower():
if network == 'ethereum':
baseline = 50 * (1 + symbol_hash) # Higher gas prices
else:
baseline = 5 * (1 + symbol_hash) # Lower gas prices
elif 'fee' in column.lower():
baseline = 1000000 * (1 + symbol_hash) # Transaction fees in wei/satoshi
else:
# Other on-chain metrics
baseline = symbol_hash * 1000
else:
baseline = symbol_median
else:
# Networks without meaningful on-chain data OR 7d_change columns
if '7d_change' in column:
# Use the calculated baseline from above
pass # baseline already set
else:
baseline = 0
df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline)
elif column.startswith('exchangePrices.') or column.startswith('symbols.'):
# Exchange-specific data
exchange = column.split('.')[1] if '.' in column else 'unknown'
if column.startswith('exchangePrices.'):
# Use main price with small exchange-specific variation
main_price = profile.get('price_level', 100)
if main_price and not is_stablecoin:
# Different exchanges have small price differences
exchange_hash = hash(symbol + exchange) % 200 / 10000 # ±1% variation
baseline = main_price * (1 + exchange_hash)
else:
baseline = main_price or 1
else:
# Exchange symbols - should be strings, handle separately
continue
df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline)
else:
# Generic numeric imputation with crypto-specific variation
symbol_median = symbol_data.median()
if pd.isna(symbol_median):
overall_median = df[column].median()
if pd.isna(overall_median):
overall_median = 0
# Add crypto-specific variation based on rank and volatility
symbol_hash = hash(symbol + column) % 2000 / 1000 - 1 # -1 to +1
volatility_factor = 2 if rank and rank > 100 else 1
variation = overall_median * 0.2 * symbol_hash * volatility_factor
baseline = overall_median + variation
else:
baseline = symbol_median
df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline)
return df_result[column]
def _force_fill_stubborn_nulls(self, df):
"""Aggressively fill any remaining nulls with appropriate defaults."""
# Target ALL the problematic 7d_change columns
stubborn_cols = ['fees_7d_change', 'gas_used_7d_change', 'gas_price_7d_change']
for col in stubborn_cols:
if col in df.columns:
null_count_before = df[col].isnull().sum()
if null_count_before > 0:
# Strategy 1: Try group-based fill first
df[col] = df.groupby('symbol')[col].transform(lambda x: x.fillna(x.median()))
# Strategy 2: Fill remaining with symbol-specific hash-based values
still_null = df[col].isnull()
if still_null.any():
for symbol in df[still_null]['symbol'].unique():
symbol_mask = (df['symbol'] == symbol) & df[col].isnull()
if symbol_mask.any():
# Create deterministic but varied values based on symbol
symbol_hash = hash(symbol + col) % 2000 / 1000 - 1 # -1 to +1
if 'fees' in col.lower():
fill_value = symbol_hash * 0.3 # ±30% fee change
elif 'gas' in col.lower():
fill_value = symbol_hash * 0.25 # ±25% gas change
else:
fill_value = symbol_hash * 0.2 # ±20% generic change
df.loc[symbol_mask, col] = fill_value
# Strategy 3: Nuclear option - fill any remaining with 0
remaining_nulls = df[col].isnull().sum()
if remaining_nulls > 0:
print(f"[WARNING] Nuclear fill: {remaining_nulls} nulls in {col} filled with 0")
df[col] = df[col].fillna(0)
return df
def _nuclear_null_elimination(self, df):
"""Final pass to eliminate ALL nulls with extreme prejudice."""
print("[INFO] Performing nuclear null elimination...")
# Get all numeric columns
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
null_count = df[col].isnull().sum()
if null_count > 0:
print(f"[NUCLEAR] Eliminating {null_count} nulls in {col}")
# Try different strategies in order
if '7d_change' in col or 'change' in col.lower():
# Change columns - use symbol-specific hash
for symbol in df['symbol'].unique():
symbol_mask = (df['symbol'] == symbol) & df[col].isnull()
if symbol_mask.any():
symbol_hash = hash(symbol + col) % 2000 / 1000 - 1 # -1 to +1
if 'fees' in col.lower():
fill_value = symbol_hash * 0.3
elif 'gas' in col.lower():
fill_value = symbol_hash * 0.25
else:
fill_value = symbol_hash * 0.2
df.loc[symbol_mask, col] = fill_value
elif 'timestamp' in col.lower():
# Timestamp columns
df[col] = df[col].fillna(method='ffill').fillna(method='bfill').fillna(0)
elif col in ['price', 'open', 'high', 'low', 'close']:
# Price columns - use symbol-specific values
for symbol in df['symbol'].unique():
symbol_mask = (df['symbol'] == symbol) & df[col].isnull()
if symbol_mask.any():
symbol_price = df[df['symbol'] == symbol][col].median()
if pd.isna(symbol_price):
symbol_hash = hash(symbol + col) % 10000 / 100 # 0-100 range
symbol_price = 1 + symbol_hash # $1-$101
df.loc[symbol_mask, col] = symbol_price
else:
# Generic columns - try median first, then 0
median_val = df[col].median()
if pd.isna(median_val):
median_val = 0
df[col] = df[col].fillna(median_val)
# Final check - if still nulls, force to 0
remaining_nulls = df[col].isnull().sum()
if remaining_nulls > 0:
print(f"[NUCLEAR] Force filling {remaining_nulls} remaining nulls in {col} with 0")
df[col] = df[col].fillna(0)
return df
def _enhanced_sentiment_imputation(self, df):
"""Enhanced sentiment imputation that creates realistic, diverse sentiment values."""
print(f"[INFO] Starting enhanced sentiment imputation...")
# Define sentiment columns
core_sentiment_cols = ['sentiment_score', 'neg', 'neu', 'pos']
for col in core_sentiment_cols:
if col in df.columns:
null_count_before = df[col].isnull().sum()
if null_count_before > 0:
print(f"[INFO] Processing {col}: {null_count_before} nulls to fill")
# Process each symbol separately for core sentiment columns
for col in core_sentiment_cols:
if col in df.columns and df[col].isnull().any():
print(f"Enhanced imputation for {col}...")
for symbol in df['symbol'].unique():
symbol_mask = df['symbol'] == symbol
symbol_sentiment = df.loc[symbol_mask, col]
if symbol_sentiment.isnull().any():
# Try forward/backward fill first
filled = symbol_sentiment.fillna(method='ffill').fillna(method='bfill')
# For remaining nulls, use symbol-specific realistic values
if filled.isnull().any():
symbol_hash = hash(symbol + col) % 10000 / 10000
symbol_upper = symbol.upper()
# Define crypto categories
stablecoins = ['USDT', 'USDC', 'BUSD', 'DAI', 'TUSD', 'USDP']
major_cryptos = ['BTC', 'ETH', 'BNB', 'ADA', 'XRP', 'SOL', 'DOT', 'AVAX']
if col == 'sentiment_score':
# Sentiment score (-1 to +1)
if any(stable in symbol_upper for stable in stablecoins):
fill_value = (symbol_hash - 0.5) * 0.1 # Stable: ±0.05
elif any(major in symbol_upper for major in major_cryptos):
fill_value = 0.1 + (symbol_hash - 0.5) * 0.4 # Major: 0.1 ± 0.2
else:
fill_value = (symbol_hash - 0.5) * 0.6 # Alt: ±0.3
fill_value = np.clip(fill_value, -1.0, 1.0)
elif col == 'neu':
# Neutral sentiment (dominant)
if any(stable in symbol_upper for stable in stablecoins):
fill_value = 0.85 + symbol_hash * 0.1 # 0.85-0.95
elif any(major in symbol_upper for major in major_cryptos):
fill_value = 0.65 + symbol_hash * 0.2 # 0.65-0.85
else:
fill_value = 0.55 + symbol_hash * 0.3 # 0.55-0.85
fill_value = np.clip(fill_value, 0.0, 1.0)
elif col == 'pos':
# Positive sentiment
if any(stable in symbol_upper for stable in stablecoins):
fill_value = 0.05 + symbol_hash * 0.05 # 0.05-0.10
elif any(major in symbol_upper for major in major_cryptos):
fill_value = 0.15 + symbol_hash * 0.15 # 0.15-0.30
else:
fill_value = 0.10 + symbol_hash * 0.25 # 0.10-0.35
fill_value = np.clip(fill_value, 0.0, 1.0)
elif col == 'neg':
# Negative sentiment
if any(stable in symbol_upper for stable in stablecoins):
fill_value = 0.05 + symbol_hash * 0.05 # 0.05-0.10
elif any(major in symbol_upper for major in major_cryptos):
fill_value = 0.10 + symbol_hash * 0.10 # 0.10-0.20
else:
fill_value = 0.15 + symbol_hash * 0.15 # 0.15-0.30
fill_value = np.clip(fill_value, 0.0, 1.0)
filled = filled.fillna(fill_value)
df.loc[symbol_mask, col] = filled
# Normalize sentiment scores so neg + neu + pos = 1.0
if all(col in df.columns for col in ['neg', 'neu', 'pos']):
print("Normalizing sentiment scores...")
for idx in df.index:
neg_val = df.at[idx, 'neg']
neu_val = df.at[idx, 'neu']
pos_val = df.at[idx, 'pos']
current_sum = neg_val + neu_val + pos_val
if current_sum > 0:
df.at[idx, 'neg'] = neg_val / current_sum
df.at[idx, 'neu'] = neu_val / current_sum
df.at[idx, 'pos'] = pos_val / current_sum
else:
# Default neutral sentiment
df.at[idx, 'neg'] = 0.1
df.at[idx, 'neu'] = 0.8
df.at[idx, 'pos'] = 0.1
# Handle other sentiment features
other_sentiment_features = [
'social_sentiment_mean', 'social_sentiment_std', 'social_sentiment_count',
'social_confidence_mean', 'combined_sentiment', 'sentiment_agreement',
'sentiment_change_1', 'sentiment_sma_7', 'sentiment_momentum'
]
for col in other_sentiment_features:
if col in df.columns and df[col].isnull().any():
if 'sentiment' in col.lower() and 'count' not in col.lower():
# Sentiment scores - neutral with crypto-specific variation
for symbol in df['symbol'].unique():
mask = df['symbol'] == symbol
symbol_hash = (hash(symbol + col) % 200 / 1000) - 0.1 # -0.1 to +0.1
df.loc[mask, col] = df.loc[mask, col].fillna(symbol_hash)
elif 'count' in col.lower():
df[col] = df[col].fillna(0)
else:
median_val = df[col].median()
if pd.isna(median_val):
median_val = 0
df[col] = df[col].fillna(median_val)
# Final validation
print(f"[INFO] Enhanced sentiment imputation completed:")
for col in core_sentiment_cols:
if col in df.columns:
null_count_after = df[col].isnull().sum()
print(f" {col}: {null_count_after} nulls remaining")
return df
def fit_transform(self, df):
"""Apply crypto-specific imputation with anti-homogenization measures."""
df_imputed = df.copy()
df_imputed = df_imputed.sort_values(['symbol', 'interval_timestamp'])
# Create crypto profiles
self.crypto_profiles = self._create_crypto_profiles(df_imputed)
print(f"Created profiles for {len(self.crypto_profiles)} unique cryptocurrencies")
# 1. Handle categorical/flag columns
categorical_cols = [
'symbol', 'cg_id', 'blockchain_network', 'stable', 'is_crypto', 'is_stock',
'is_other', 'alpaca_data_available', 'is_trading_hours', 'is_weekend'
]
for col in categorical_cols:
if col in df_imputed.columns:
if col in ['is_crypto']:
df_imputed[col] = df_imputed[col].fillna(1) # Default to crypto
elif col in ['is_stock', 'is_other']:
df_imputed[col] = df_imputed[col].fillna(0) # Not stock/other
elif col in ['stable']:
# Determine if stablecoin based on symbol
stablecoin_symbols = ['USDT', 'USDC', 'BUSD', 'DAI', 'TUSD', 'USDP']
for symbol in stablecoin_symbols:
mask = df_imputed['symbol'].str.contains(symbol, case=False, na=False)
df_imputed.loc[mask, col] = df_imputed.loc[mask, col].fillna(True)
df_imputed[col] = df_imputed[col].fillna(False)
else:
df_imputed[col] = df_imputed.groupby('symbol')[col].fillna(method='ffill').fillna(method='bfill')
# 2. Exchange symbols (string data)
exchange_symbol_cols = [col for col in df_imputed.columns if col.startswith('symbols.')]
for col in exchange_symbol_cols:
if df_imputed[col].dtype == 'object':
# Forward/backward fill within symbol groups
df_imputed[col] = df_imputed.groupby('symbol')[col].fillna(method='ffill').fillna(method='bfill')
# 3. Core crypto market data
core_market_cols = [
'price', 'marketcap', 'volume', 'dominance', 'rank',
'open', 'high', 'low', 'close'
]
for col in core_market_cols:
if col in df_imputed.columns and df_imputed[col].isnull().any():
print(f"Imputing {col} with crypto-specific context...")
df_imputed[col] = self._impute_with_crypto_context(
df_imputed, col, self.crypto_profiles
)
# 4. Exchange prices
exchange_price_cols = [col for col in df_imputed.columns if col.startswith('exchangePrices.')]
for col in exchange_price_cols:
if df_imputed[col].isnull().any():
print(f"Imputing {col} with crypto-specific context...")
df_imputed[col] = self._impute_with_crypto_context(
df_imputed, col, self.crypto_profiles
)
# 5. Performance metrics
performance_cols = [col for col in df_imputed.columns if col.startswith('performance.') or col.startswith('rankDiffs.')]
for col in performance_cols:
if df_imputed[col].isnull().any():
print(f"Imputing {col} with crypto-specific context...")
df_imputed[col] = self._impute_with_crypto_context(
df_imputed, col, self.crypto_profiles
)
# 6. Technical indicators
tech_indicators = [
'rsi', 'macd', 'macd_signal', 'macd_histogram', 'atr', 'bb_position',
'stoch_k', 'stoch_d', 'cci', 'roc_5', 'roc_10', 'mfi', 'rsi_macd_signal',
'ema_convergence', 'true_range_pct'
]
for col in tech_indicators:
if col in df_imputed.columns and df_imputed[col].isnull().any():
print(f"Imputing {col} with crypto-specific context...")
df_imputed[col] = self._impute_with_crypto_context(
df_imputed, col, self.crypto_profiles
)
# 7. Price/volume change features
change_features = [
'price_change_1', 'price_change_7', 'price_change_14', 'volume_ratio',
'volatility_7', 'price_volume_trend', 'volatility_consistency'
]
for col in change_features:
if col in df_imputed.columns and df_imputed[col].isnull().any():
df_imputed[col] = self._impute_with_crypto_context(
df_imputed, col, self.crypto_profiles
)
# 8. On-chain features (crypto-specific) - PRIORITY HANDLING for problematic columns
onchain_features = [
'transaction_volume', 'total_fees', 'total_gas_used', 'avg_gas_price',
'transaction_count', 'tx_count_7d_change', 'tx_count_sma_7',
'tx_volume_7d_change', 'tx_volume_sma_7', 'gas_used_7d_change',
'gas_used_sma_7', 'gas_price_7d_change', 'gas_price_sma_7',
'fees_7d_change', 'avg_tx_size', 'tx_price_correlation'
]
for col in onchain_features:
if col in df_imputed.columns and df_imputed[col].isnull().any():
print(f"Imputing {col} with crypto on-chain context...")
df_imputed[col] = self._impute_with_crypto_context(
df_imputed, col, self.crypto_profiles
)
# 9. AGGRESSIVE NULL ELIMINATION for stubborn columns
df_imputed = self._force_fill_stubborn_nulls(df_imputed)
# 10. Sentiment features
sentiment_features = [
'social_sentiment_mean', 'social_sentiment_std', 'social_sentiment_count',
'social_confidence_mean', 'combined_sentiment', 'sentiment_agreement',
'sentiment_change_1', 'sentiment_sma_7', 'sentiment_momentum',
'sentiment_score', 'neg', 'neu', 'pos'
]
for col in sentiment_features:
if col in df_imputed.columns and df_imputed[col].isnull().any():
if 'sentiment' in col.lower() and 'count' not in col.lower():
# Sentiment scores - neutral with crypto-specific variation
for symbol in df_imputed['symbol'].unique():
mask = df_imputed['symbol'] == symbol
symbol_hash = (hash(symbol + col) % 200 / 1000) - 0.1 # -0.1 to +0.1
df_imputed.loc[mask, col] = df_imputed.loc[mask, col].fillna(symbol_hash)
elif 'count' in col.lower():
df_imputed[col] = df_imputed[col].fillna(0)
else:
median_val = df_imputed[col].median()
df_imputed[col] = df_imputed[col].fillna(median_val)
# 11. Quality metrics
quality_features = [
'data_quality_score', 'core_features_completeness', 'technical_indicators_completeness',
'onchain_features_completeness', 'price_data_completeness',
'overall_feature_completeness', 'data_completeness_score'
]
for col in quality_features:
if col in df_imputed.columns and df_imputed[col].isnull().any():
median_val = np.clip(df_imputed[col].median(), 0, 1)
# Add tiny crypto-specific variation
for symbol in df_imputed['symbol'].unique():
mask = df_imputed['symbol'] == symbol
symbol_hash = hash(symbol + col) % 100 / 10000 # Very small variation
fill_val = np.clip(median_val + symbol_hash, 0, 1)
df_imputed.loc[mask, col] = df_imputed.loc[mask, col].fillna(fill_val)
# 12. Temporal features
temporal_features = ['hour', 'day_of_week', 'is_weekend', 'is_trading_hours']
for col in temporal_features:
if col in df_imputed.columns and df_imputed[col].isnull().any():
if col == 'hour':
df_imputed[col] = df_imputed[col].fillna(12) # Default to noon
elif col == 'day_of_week':
df_imputed[col] = df_imputed[col].fillna(3) # Default to Wednesday
elif col == 'is_weekend':
df_imputed[col] = df_imputed[col].fillna(0) # Default to weekday
elif col == 'is_trading_hours':
df_imputed[col] = df_imputed[col].fillna(1) # Crypto trades 24/7
# 13. Handle any remaining numeric columns
remaining_numeric = df_imputed.select_dtypes(include=[np.number]).columns
remaining_with_nulls = [col for col in remaining_numeric if df_imputed[col].isnull().any()]
for col in remaining_with_nulls:
if col not in ['id', 'id_alpaca', 'backup_id'] and not col.endswith('_timestamp'):
print(f"Imputing remaining column {col}...")
df_imputed[col] = self._impute_with_crypto_context(
df_imputed, col, self.crypto_profiles
)
# 14. NUCLEAR NULL ELIMINATION - Final pass
df_imputed = self._nuclear_null_elimination(df_imputed)
print("[INFO] Crypto imputation complete with anti-homogenization measures")
return df_imputed
# Usage function with validation - FIXED VERSION
def impute_crypto_with_validation_fixed(file_path, output_path=None):
"""Impute crypto data and validate no homogenization occurred."""
try:
df = pd.read_parquet(file_path)
except Exception as e:
print(f"[ERROR] Failed to load file: {e}")
return None
# Sample symbols for validation
symbols_sample = df['symbol'].unique()[:5]
imputer = CryptoDataImputerFixed()
df_imputed = imputer.fit_transform(df)
# TRIPLE CHECK: Ensure problematic columns have no nulls
problematic_cols = ['gas_used_7d_change', 'fees_7d_change', 'gas_price_7d_change']
for col in problematic_cols:
if col in df_imputed.columns:
null_count = df_imputed[col].isnull().sum()
if null_count > 0:
print(f"[EMERGENCY] Still {null_count} nulls in {col} - applying emergency fix")
# Emergency symbol-specific fill
for symbol in df_imputed['symbol'].unique():
symbol_mask = (df_imputed['symbol'] == symbol) & df_imputed[col].isnull()
if symbol_mask.any():
symbol_hash = hash(symbol + col) % 2000 / 1000 - 1 # -1 to +1
if 'fees' in col.lower():
fill_value = symbol_hash * 0.3
elif 'gas' in col.lower():
fill_value = symbol_hash * 0.25
else:
fill_value = symbol_hash * 0.2
df_imputed.loc[symbol_mask, col] = fill_value
# Final nuclear option
df_imputed[col] = df_imputed[col].fillna(0)
print(f"[EMERGENCY] {col} nulls after emergency fix: {df_imputed[col].isnull().sum()}")
# Combine alpaca data with main data if available
price_cols = ['high', 'low', 'close', 'volume', 'open']
for col in price_cols:
alpaca_col = f"{col}_alpaca"
if col in df_imputed.columns and alpaca_col in df_imputed.columns:
df_imputed[col] = df_imputed[col].combine_first(df_imputed[alpaca_col])
# Drop unwanted columns before saving
drop_cols = [
'_filename', '_original_format', 'alpaca_data_available',
'ask_exchange', 'ask_exchange_alpaca', 'bid_exchange', 'bid_exchange_alpaca',
'conditions', 'conditions_alpaca', 'conditions_trade', 'conditions_trade_alpaca',
'symbol_quote', 'symbol_quote_alpaca', 'symbol_trade', 'symbol_trade_alpaca',
'tape', 'tape_alpaca', 'tape_trade', 'tape_trade_alpaca',
'id', 'id_alpaca', 'is_new_symbol', 'timestamp_dt',
'estimateCurrency', 'exchange', 'exchange_alpaca', 'exchange_company',
'finnhubIndustry', 'logo', 'ticker', 'weburl', 'latest_news_timestamp', 'volume_price_momentum',
'country', 'currency', 'ipo', 'name', 'period', 'phone', 'year', 'month', 'symbols.kraken',
'datetime', 'headline', 'blockchain_network', 'symbols.cryptocom', 'symbols.bitmart', 'symbols.kucoin', 'symbols.okx',
'symbols.coinbase','symbols.binance','symbols.mexc','symbols.bybit','symbols.bingx', 'symbols.huobi', 'symbols.bitget', 'symbols.gateio',
'interval_timestamp_dt', 'interval_timestamp_alpaca', 'interval_timestamp_trade', 'feature_timestamp', 'alpaca_merge_timestamp', 'sentiment_timestamp',
'hour', 'day_of_week', 'is_weekend', 'is_trading_hours', 'is_crypto', 'is_stock', 'is_other', 'gas_used_7d_change', 'fees_7d_change', 'gas_price_7d_change'
]
# Remove alpaca columns after combining
alpaca_cols = [col for col in df_imputed.columns if col.endswith('_alpaca')]
drop_cols.extend(alpaca_cols)
for col in drop_cols:
if col in df_imputed.columns:
df_imputed = df_imputed.drop(columns=col)
# Reorder columns: 'symbol' first, 'interval_timestamp' second, rest follow
cols = list(df_imputed.columns)
if 'symbol' in cols and 'interval_timestamp' in cols:
rest = [c for c in cols if c not in ['symbol', 'interval_timestamp']]
df_imputed = df_imputed[['symbol', 'interval_timestamp'] + rest]
# FINAL FINAL CHECK for problematic columns (after all drops/reorders)
for col in problematic_cols:
if col in df_imputed.columns:
null_count = df_imputed[col].isnull().sum()
if null_count > 0:
print(f"[FINAL CHECK] Still {null_count} nulls in {col} - final nuclear fill")
df_imputed[col] = df_imputed[col].fillna(0)
# Validation: Check that different symbols have different values
print("\n[VALIDATION] Checking for homogenization...")
for symbol in symbols_sample:
symbol_data = df_imputed[df_imputed['symbol'] == symbol]
if len(symbol_data) > 0:
price_mean = symbol_data['price'].mean() if 'price' in symbol_data.columns else 0
volume_mean = symbol_data['volume'].mean() if 'volume' in symbol_data.columns else 0
print(f" {symbol}: Price={price_mean:.2f}, Volume={volume_mean:.0f}")
# Save results
if output_path:
# Clean up data types
if 'backup_id' in df_imputed.columns:
df_imputed['backup_id'] = df_imputed['backup_id'].astype(str)
try:
df_imputed.to_parquet(output_path, compression='snappy')
print(f"[INFO] Crypto data imputed and saved to: {output_path}")
except Exception as e:
print(f"[ERROR] Failed to save file: {e}")
# Debug: print null count, dtype, and sample after saving
# for col in problematic_cols:
# if col in df_imputed.columns:
# print(f"[DEBUG] Nulls in {col} after save: {df_imputed[col].isnull().sum()}")
# print(f"[DEBUG] Dtype for {col}: {df_imputed[col].dtype}")
# print(f"[DEBUG] Sample values for {col}: {df_imputed[col].head(10).tolist()}")
return df_imputed
# Example usage - FIXED VERSION
def main():
input_file = "data/merged/features/crypto_features.parquet"
output_file = input_file
df_clean = impute_crypto_with_validation_fixed(input_file, output_file)
if df_clean is not None:
print(f"\n[SUCCESS] Crypto data processing completed!")
print(f"Final shape: {df_clean.shape}")
print(f"Null values remaining: {df_clean.isnull().sum().sum()}")
# Final verification of problematic columns
problematic_cols = ['gas_used_7d_change', 'fees_7d_change', 'gas_price_7d_change']
for col in problematic_cols:
if col in df_clean.columns:
nulls = df_clean[col].isnull().sum()
print(f"[FINAL VERIFICATION] {col}: {nulls} nulls")
else:
print("[ERROR] Failed to load or impute crypto data.")
if __name__ == "__main__":
main()