import pandas as pd import numpy as np from sklearn.impute import KNNImputer from sklearn.preprocessing import StandardScaler import warnings warnings.filterwarnings('ignore') class CryptoDataImputerFixed: """ Specialized imputation for cryptocurrency data that preserves unique characteristics of different crypto assets and prevents homogenization. """ def __init__(self, preserve_crypto_diversity=True): self.preserve_crypto_diversity = preserve_crypto_diversity self.crypto_profiles = {} self.scalers = {} def _create_crypto_profiles(self, df): """Create profiles for each cryptocurrency to guide imputation.""" profiles = {} for symbol in df['symbol'].unique(): symbol_data = df[df['symbol'] == symbol] # Calculate crypto-specific statistics # Defensive mode extraction for 'stable' and 'blockchain_network' stable_mode = symbol_data['stable'].mode() if 'stable' in symbol_data.columns else pd.Series() is_stablecoin = stable_mode.iloc[0] if not stable_mode.empty else False network_mode = symbol_data['blockchain_network'].mode() if 'blockchain_network' in symbol_data.columns else pd.Series() blockchain_network = network_mode.iloc[0] if not network_mode.empty else None profile = { 'symbol': symbol, 'price_level': symbol_data['price'].median() if 'price' in symbol_data.columns else None, 'price_volatility': symbol_data['price'].std() if 'price' in symbol_data.columns else None, 'volume_level': symbol_data['volume'].median() if 'volume' in symbol_data.columns else None, 'marketcap_level': symbol_data['marketcap'].median() if 'marketcap' in symbol_data.columns else None, 'dominance_level': symbol_data['dominance'].median() if 'dominance' in symbol_data.columns else None, 'rank': symbol_data['rank'].median() if 'rank' in symbol_data.columns else None, 'is_stablecoin': is_stablecoin, 'typical_rsi': symbol_data['rsi'].median() if 'rsi' in symbol_data.columns else None, 'blockchain_network': blockchain_network, 'has_onchain_data': symbol_data['transaction_count'].notna().any() if 'transaction_count' in symbol_data.columns else False, 'exchange_coverage': len([col for col in symbol_data.columns if col.startswith('symbols.') and symbol_data[col].notna().any()]), 'data_availability': len(symbol_data) / len(df) if len(df) > 0 else 0 } profiles[symbol] = profile return profiles def _impute_with_crypto_context(self, df, column, crypto_profiles): """Impute values using crypto-specific context to prevent homogenization.""" df_result = df.copy() for symbol in df['symbol'].unique(): symbol_mask = df['symbol'] == symbol symbol_data = df.loc[symbol_mask, column] if symbol_data.isnull().sum() == 0: continue # No missing values for this symbol profile = crypto_profiles.get(symbol, {}) is_stablecoin = profile.get('is_stablecoin', False) rank = profile.get('rank', 999) # Strategy depends on column type and crypto characteristics if column in ['price', 'open', 'high', 'low', 'close']: # Price data - special handling for stablecoins if is_stablecoin: # Stablecoins should stay around $1 base_price = 1.0 symbol_hash = hash(symbol + column) % 1000 / 100000 # Very small variation adjusted_price = base_price + symbol_hash else: # Regular crypto - use interpolation with crypto-specific bounds interpolated = symbol_data.interpolate(method='linear', limit_direction='both') # If still missing, use crypto's typical price level with volatility-based noise if interpolated.isnull().any() and profile.get('price_level'): base_price = profile['price_level'] volatility = profile.get('price_volatility', base_price * 0.05) # Crypto is more volatile # Add crypto-specific noise based on rank (higher rank = more volatile) symbol_hash = hash(symbol) % 1000 / 1000 # 0-1 range volatility_multiplier = 1 + (rank / 100) # Higher rank = higher volatility noise_factor = (symbol_hash - 0.5) * 0.2 * volatility_multiplier # More volatile than stocks adjusted_price = base_price * (1 + noise_factor) else: adjusted_price = interpolated df_result.loc[symbol_mask, column] = symbol_data.fillna(adjusted_price) elif column in ['volume', 'volume_alpaca']: # Volume data - crypto volume patterns differ significantly filled = symbol_data.fillna(method='ffill').fillna(method='bfill') if filled.isnull().any(): base_volume = profile.get('volume_level', 1000000) # Default higher for crypto # Major cryptos have much higher volume if rank and rank <= 10: volume_multiplier = 5 + (hash(symbol + column) % 1000 / 200) # 5x-10x elif rank and rank <= 50: volume_multiplier = 1 + (hash(symbol + column) % 1000 / 500) # 1x-3x else: volume_multiplier = 0.1 + (hash(symbol + column) % 1000 / 1000) # 0.1x-1.1x adjusted_volume = base_volume * volume_multiplier filled = filled.fillna(adjusted_volume) df_result.loc[symbol_mask, column] = filled elif column in ['marketcap']: # Market cap - highly dependent on rank if profile.get('marketcap_level'): baseline = profile['marketcap_level'] else: # Estimate based on rank if rank and rank <= 10: baseline = 10_000_000_000 # $10B+ for top 10 elif rank and rank <= 50: baseline = 1_000_000_000 # $1B+ for top 50 elif rank and rank <= 100: baseline = 100_000_000 # $100M+ for top 100 else: baseline = 10_000_000 # $10M+ for others # Add symbol-specific variation symbol_hash = hash(symbol + column) % 1000 / 1000 baseline *= (0.5 + symbol_hash) # 0.5x to 1.5x variation df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline) elif column in ['dominance']: # Market dominance - only meaningful for major cryptos if rank and rank <= 5: # Major cryptos have meaningful dominance symbol_hash = hash(symbol + column) % 1000 / 1000 if symbol.upper() == 'BTC': baseline = 0.4 + (symbol_hash * 0.2) # BTC: 40-60% elif symbol.upper() == 'ETH': baseline = 0.15 + (symbol_hash * 0.1) # ETH: 15-25% else: baseline = 0.01 + (symbol_hash * 0.05) # Others: 1-6% else: baseline = 0.001 + (hash(symbol + column) % 1000 / 100000) # Very small df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline) elif column in ['rsi', 'stoch_k', 'stoch_d']: # Oscillator indicators - crypto markets are more extreme symbol_median = symbol_data.median() if pd.isna(symbol_median): symbol_hash = hash(symbol + column) % 1000 / 1000 if column == 'rsi': # Crypto RSI tends to be more extreme if rank and rank <= 10: # Major cryptos more stable baseline = 20 + (symbol_hash * 60) # 20-80 range else: # Alt coins more extreme baseline = 10 + (symbol_hash * 80) # 10-90 range else: # stochastic baseline = 10 + (symbol_hash * 80) # 10-90 range else: baseline = symbol_median df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline) elif column in ['macd', 'macd_signal', 'macd_histogram']: # MACD - crypto MACD values tend to be more volatile symbol_median = symbol_data.median() if pd.isna(symbol_median): price_level = profile.get('price_level', 1) symbol_hash = hash(symbol + column) % 2000 / 1000 - 1 # -1 to +1 # Scale MACD relative to price level and volatility volatility_factor = 2 if rank and rank > 50 else 1 # Alt coins more volatile baseline = (price_level * 0.01 * volatility_factor) * symbol_hash else: baseline = symbol_median df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline) elif column.startswith('performance.'): # Performance metrics - crypto performance is more extreme symbol_median = symbol_data.median() if pd.isna(symbol_median): symbol_hash = hash(symbol + column) % 2000 / 1000 - 1 # -1 to +1 # Different baselines for different timeframes if 'year' in column: baseline = symbol_hash * 5 # ±500% annual performance possible elif 'month' in column: baseline = symbol_hash * 2 # ±200% monthly performance possible elif 'week' in column: baseline = symbol_hash * 0.5 # ±50% weekly performance possible elif 'day' in column: baseline = symbol_hash * 0.2 # ±20% daily performance possible else: # hour, min baseline = symbol_hash * 0.05 # ±5% short-term performance # Alt coins are more volatile if rank and rank > 50: baseline *= 2 else: baseline = symbol_median df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline) elif column.startswith('tx_') or column.startswith('gas_') or column in [ 'transaction_volume', 'transaction_count', 'total_fees', 'total_gas_used', 'avg_gas_price', 'avg_tx_size', 'fees_7d_change', 'gas_used_7d_change', 'gas_price_7d_change' ] or '_7d_change' in column: # On-chain features - only meaningful for blockchains with transaction data network = profile.get('blockchain_network', 'unknown') # Special handling for 7d change columns if '7d_change' in column: # These are percentage changes, should be reasonable values symbol_hash = hash(symbol + column) % 2000 / 1000 - 1 # -1 to +1 range if 'fees' in column.lower(): # Fee changes can be more volatile in crypto baseline = symbol_hash * 0.5 # ±50% change elif 'gas' in column.lower(): # Gas usage changes baseline = symbol_hash * 0.3 # ±30% change else: # Other transaction-related changes baseline = symbol_hash * 0.4 # ±40% change # Alt coins more volatile if rank and rank > 100: baseline *= 2 elif network in ['ethereum', 'bitcoin', 'polygon', 'bsc', 'avalanche']: # Major networks have meaningful on-chain data symbol_median = symbol_data.median() if pd.isna(symbol_median): # Estimate based on network and rank symbol_hash = hash(symbol + column) % 1000 / 1000 if 'count' in column.lower(): if network == 'ethereum': baseline = 1000000 * (1 + symbol_hash) # High transaction count elif network == 'bitcoin': baseline = 300000 * (1 + symbol_hash) # Lower transaction count else: baseline = 500000 * (1 + symbol_hash) # Medium transaction count elif 'gas' in column.lower(): if network == 'ethereum': baseline = 50 * (1 + symbol_hash) # Higher gas prices else: baseline = 5 * (1 + symbol_hash) # Lower gas prices elif 'fee' in column.lower(): baseline = 1000000 * (1 + symbol_hash) # Transaction fees in wei/satoshi else: # Other on-chain metrics baseline = symbol_hash * 1000 else: baseline = symbol_median else: # Networks without meaningful on-chain data OR 7d_change columns if '7d_change' in column: # Use the calculated baseline from above pass # baseline already set else: baseline = 0 df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline) elif column.startswith('exchangePrices.') or column.startswith('symbols.'): # Exchange-specific data exchange = column.split('.')[1] if '.' in column else 'unknown' if column.startswith('exchangePrices.'): # Use main price with small exchange-specific variation main_price = profile.get('price_level', 100) if main_price and not is_stablecoin: # Different exchanges have small price differences exchange_hash = hash(symbol + exchange) % 200 / 10000 # ±1% variation baseline = main_price * (1 + exchange_hash) else: baseline = main_price or 1 else: # Exchange symbols - should be strings, handle separately continue df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline) else: # Generic numeric imputation with crypto-specific variation symbol_median = symbol_data.median() if pd.isna(symbol_median): overall_median = df[column].median() if pd.isna(overall_median): overall_median = 0 # Add crypto-specific variation based on rank and volatility symbol_hash = hash(symbol + column) % 2000 / 1000 - 1 # -1 to +1 volatility_factor = 2 if rank and rank > 100 else 1 variation = overall_median * 0.2 * symbol_hash * volatility_factor baseline = overall_median + variation else: baseline = symbol_median df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline) return df_result[column] def _force_fill_stubborn_nulls(self, df): """Aggressively fill any remaining nulls with appropriate defaults.""" # Target ALL the problematic 7d_change columns stubborn_cols = ['fees_7d_change', 'gas_used_7d_change', 'gas_price_7d_change'] for col in stubborn_cols: if col in df.columns: null_count_before = df[col].isnull().sum() if null_count_before > 0: # Strategy 1: Try group-based fill first df[col] = df.groupby('symbol')[col].transform(lambda x: x.fillna(x.median())) # Strategy 2: Fill remaining with symbol-specific hash-based values still_null = df[col].isnull() if still_null.any(): for symbol in df[still_null]['symbol'].unique(): symbol_mask = (df['symbol'] == symbol) & df[col].isnull() if symbol_mask.any(): # Create deterministic but varied values based on symbol symbol_hash = hash(symbol + col) % 2000 / 1000 - 1 # -1 to +1 if 'fees' in col.lower(): fill_value = symbol_hash * 0.3 # ±30% fee change elif 'gas' in col.lower(): fill_value = symbol_hash * 0.25 # ±25% gas change else: fill_value = symbol_hash * 0.2 # ±20% generic change df.loc[symbol_mask, col] = fill_value # Strategy 3: Nuclear option - fill any remaining with 0 remaining_nulls = df[col].isnull().sum() if remaining_nulls > 0: print(f"[WARNING] Nuclear fill: {remaining_nulls} nulls in {col} filled with 0") df[col] = df[col].fillna(0) return df def _nuclear_null_elimination(self, df): """Final pass to eliminate ALL nulls with extreme prejudice.""" print("[INFO] Performing nuclear null elimination...") # Get all numeric columns numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: null_count = df[col].isnull().sum() if null_count > 0: print(f"[NUCLEAR] Eliminating {null_count} nulls in {col}") # Try different strategies in order if '7d_change' in col or 'change' in col.lower(): # Change columns - use symbol-specific hash for symbol in df['symbol'].unique(): symbol_mask = (df['symbol'] == symbol) & df[col].isnull() if symbol_mask.any(): symbol_hash = hash(symbol + col) % 2000 / 1000 - 1 # -1 to +1 if 'fees' in col.lower(): fill_value = symbol_hash * 0.3 elif 'gas' in col.lower(): fill_value = symbol_hash * 0.25 else: fill_value = symbol_hash * 0.2 df.loc[symbol_mask, col] = fill_value elif 'timestamp' in col.lower(): # Timestamp columns df[col] = df[col].fillna(method='ffill').fillna(method='bfill').fillna(0) elif col in ['price', 'open', 'high', 'low', 'close']: # Price columns - use symbol-specific values for symbol in df['symbol'].unique(): symbol_mask = (df['symbol'] == symbol) & df[col].isnull() if symbol_mask.any(): symbol_price = df[df['symbol'] == symbol][col].median() if pd.isna(symbol_price): symbol_hash = hash(symbol + col) % 10000 / 100 # 0-100 range symbol_price = 1 + symbol_hash # $1-$101 df.loc[symbol_mask, col] = symbol_price else: # Generic columns - try median first, then 0 median_val = df[col].median() if pd.isna(median_val): median_val = 0 df[col] = df[col].fillna(median_val) # Final check - if still nulls, force to 0 remaining_nulls = df[col].isnull().sum() if remaining_nulls > 0: print(f"[NUCLEAR] Force filling {remaining_nulls} remaining nulls in {col} with 0") df[col] = df[col].fillna(0) return df def _enhanced_sentiment_imputation(self, df): """Enhanced sentiment imputation that creates realistic, diverse sentiment values.""" print(f"[INFO] Starting enhanced sentiment imputation...") # Define sentiment columns core_sentiment_cols = ['sentiment_score', 'neg', 'neu', 'pos'] for col in core_sentiment_cols: if col in df.columns: null_count_before = df[col].isnull().sum() if null_count_before > 0: print(f"[INFO] Processing {col}: {null_count_before} nulls to fill") # Process each symbol separately for core sentiment columns for col in core_sentiment_cols: if col in df.columns and df[col].isnull().any(): print(f"Enhanced imputation for {col}...") for symbol in df['symbol'].unique(): symbol_mask = df['symbol'] == symbol symbol_sentiment = df.loc[symbol_mask, col] if symbol_sentiment.isnull().any(): # Try forward/backward fill first filled = symbol_sentiment.fillna(method='ffill').fillna(method='bfill') # For remaining nulls, use symbol-specific realistic values if filled.isnull().any(): symbol_hash = hash(symbol + col) % 10000 / 10000 symbol_upper = symbol.upper() # Define crypto categories stablecoins = ['USDT', 'USDC', 'BUSD', 'DAI', 'TUSD', 'USDP'] major_cryptos = ['BTC', 'ETH', 'BNB', 'ADA', 'XRP', 'SOL', 'DOT', 'AVAX'] if col == 'sentiment_score': # Sentiment score (-1 to +1) if any(stable in symbol_upper for stable in stablecoins): fill_value = (symbol_hash - 0.5) * 0.1 # Stable: ±0.05 elif any(major in symbol_upper for major in major_cryptos): fill_value = 0.1 + (symbol_hash - 0.5) * 0.4 # Major: 0.1 ± 0.2 else: fill_value = (symbol_hash - 0.5) * 0.6 # Alt: ±0.3 fill_value = np.clip(fill_value, -1.0, 1.0) elif col == 'neu': # Neutral sentiment (dominant) if any(stable in symbol_upper for stable in stablecoins): fill_value = 0.85 + symbol_hash * 0.1 # 0.85-0.95 elif any(major in symbol_upper for major in major_cryptos): fill_value = 0.65 + symbol_hash * 0.2 # 0.65-0.85 else: fill_value = 0.55 + symbol_hash * 0.3 # 0.55-0.85 fill_value = np.clip(fill_value, 0.0, 1.0) elif col == 'pos': # Positive sentiment if any(stable in symbol_upper for stable in stablecoins): fill_value = 0.05 + symbol_hash * 0.05 # 0.05-0.10 elif any(major in symbol_upper for major in major_cryptos): fill_value = 0.15 + symbol_hash * 0.15 # 0.15-0.30 else: fill_value = 0.10 + symbol_hash * 0.25 # 0.10-0.35 fill_value = np.clip(fill_value, 0.0, 1.0) elif col == 'neg': # Negative sentiment if any(stable in symbol_upper for stable in stablecoins): fill_value = 0.05 + symbol_hash * 0.05 # 0.05-0.10 elif any(major in symbol_upper for major in major_cryptos): fill_value = 0.10 + symbol_hash * 0.10 # 0.10-0.20 else: fill_value = 0.15 + symbol_hash * 0.15 # 0.15-0.30 fill_value = np.clip(fill_value, 0.0, 1.0) filled = filled.fillna(fill_value) df.loc[symbol_mask, col] = filled # Normalize sentiment scores so neg + neu + pos = 1.0 if all(col in df.columns for col in ['neg', 'neu', 'pos']): print("Normalizing sentiment scores...") for idx in df.index: neg_val = df.at[idx, 'neg'] neu_val = df.at[idx, 'neu'] pos_val = df.at[idx, 'pos'] current_sum = neg_val + neu_val + pos_val if current_sum > 0: df.at[idx, 'neg'] = neg_val / current_sum df.at[idx, 'neu'] = neu_val / current_sum df.at[idx, 'pos'] = pos_val / current_sum else: # Default neutral sentiment df.at[idx, 'neg'] = 0.1 df.at[idx, 'neu'] = 0.8 df.at[idx, 'pos'] = 0.1 # Handle other sentiment features other_sentiment_features = [ 'social_sentiment_mean', 'social_sentiment_std', 'social_sentiment_count', 'social_confidence_mean', 'combined_sentiment', 'sentiment_agreement', 'sentiment_change_1', 'sentiment_sma_7', 'sentiment_momentum' ] for col in other_sentiment_features: if col in df.columns and df[col].isnull().any(): if 'sentiment' in col.lower() and 'count' not in col.lower(): # Sentiment scores - neutral with crypto-specific variation for symbol in df['symbol'].unique(): mask = df['symbol'] == symbol symbol_hash = (hash(symbol + col) % 200 / 1000) - 0.1 # -0.1 to +0.1 df.loc[mask, col] = df.loc[mask, col].fillna(symbol_hash) elif 'count' in col.lower(): df[col] = df[col].fillna(0) else: median_val = df[col].median() if pd.isna(median_val): median_val = 0 df[col] = df[col].fillna(median_val) # Final validation print(f"[INFO] Enhanced sentiment imputation completed:") for col in core_sentiment_cols: if col in df.columns: null_count_after = df[col].isnull().sum() print(f" {col}: {null_count_after} nulls remaining") return df def fit_transform(self, df): """Apply crypto-specific imputation with anti-homogenization measures.""" df_imputed = df.copy() df_imputed = df_imputed.sort_values(['symbol', 'interval_timestamp']) # Create crypto profiles self.crypto_profiles = self._create_crypto_profiles(df_imputed) print(f"Created profiles for {len(self.crypto_profiles)} unique cryptocurrencies") # 1. Handle categorical/flag columns categorical_cols = [ 'symbol', 'cg_id', 'blockchain_network', 'stable', 'is_crypto', 'is_stock', 'is_other', 'alpaca_data_available', 'is_trading_hours', 'is_weekend' ] for col in categorical_cols: if col in df_imputed.columns: if col in ['is_crypto']: df_imputed[col] = df_imputed[col].fillna(1) # Default to crypto elif col in ['is_stock', 'is_other']: df_imputed[col] = df_imputed[col].fillna(0) # Not stock/other elif col in ['stable']: # Determine if stablecoin based on symbol stablecoin_symbols = ['USDT', 'USDC', 'BUSD', 'DAI', 'TUSD', 'USDP'] for symbol in stablecoin_symbols: mask = df_imputed['symbol'].str.contains(symbol, case=False, na=False) df_imputed.loc[mask, col] = df_imputed.loc[mask, col].fillna(True) df_imputed[col] = df_imputed[col].fillna(False) else: df_imputed[col] = df_imputed.groupby('symbol')[col].fillna(method='ffill').fillna(method='bfill') # 2. Exchange symbols (string data) exchange_symbol_cols = [col for col in df_imputed.columns if col.startswith('symbols.')] for col in exchange_symbol_cols: if df_imputed[col].dtype == 'object': # Forward/backward fill within symbol groups df_imputed[col] = df_imputed.groupby('symbol')[col].fillna(method='ffill').fillna(method='bfill') # 3. Core crypto market data core_market_cols = [ 'price', 'marketcap', 'volume', 'dominance', 'rank', 'open', 'high', 'low', 'close' ] for col in core_market_cols: if col in df_imputed.columns and df_imputed[col].isnull().any(): print(f"Imputing {col} with crypto-specific context...") df_imputed[col] = self._impute_with_crypto_context( df_imputed, col, self.crypto_profiles ) # 4. Exchange prices exchange_price_cols = [col for col in df_imputed.columns if col.startswith('exchangePrices.')] for col in exchange_price_cols: if df_imputed[col].isnull().any(): print(f"Imputing {col} with crypto-specific context...") df_imputed[col] = self._impute_with_crypto_context( df_imputed, col, self.crypto_profiles ) # 5. Performance metrics performance_cols = [col for col in df_imputed.columns if col.startswith('performance.') or col.startswith('rankDiffs.')] for col in performance_cols: if df_imputed[col].isnull().any(): print(f"Imputing {col} with crypto-specific context...") df_imputed[col] = self._impute_with_crypto_context( df_imputed, col, self.crypto_profiles ) # 6. Technical indicators tech_indicators = [ 'rsi', 'macd', 'macd_signal', 'macd_histogram', 'atr', 'bb_position', 'stoch_k', 'stoch_d', 'cci', 'roc_5', 'roc_10', 'mfi', 'rsi_macd_signal', 'ema_convergence', 'true_range_pct' ] for col in tech_indicators: if col in df_imputed.columns and df_imputed[col].isnull().any(): print(f"Imputing {col} with crypto-specific context...") df_imputed[col] = self._impute_with_crypto_context( df_imputed, col, self.crypto_profiles ) # 7. Price/volume change features change_features = [ 'price_change_1', 'price_change_7', 'price_change_14', 'volume_ratio', 'volatility_7', 'price_volume_trend', 'volatility_consistency' ] for col in change_features: if col in df_imputed.columns and df_imputed[col].isnull().any(): df_imputed[col] = self._impute_with_crypto_context( df_imputed, col, self.crypto_profiles ) # 8. On-chain features (crypto-specific) - PRIORITY HANDLING for problematic columns onchain_features = [ 'transaction_volume', 'total_fees', 'total_gas_used', 'avg_gas_price', 'transaction_count', 'tx_count_7d_change', 'tx_count_sma_7', 'tx_volume_7d_change', 'tx_volume_sma_7', 'gas_used_7d_change', 'gas_used_sma_7', 'gas_price_7d_change', 'gas_price_sma_7', 'fees_7d_change', 'avg_tx_size', 'tx_price_correlation' ] for col in onchain_features: if col in df_imputed.columns and df_imputed[col].isnull().any(): print(f"Imputing {col} with crypto on-chain context...") df_imputed[col] = self._impute_with_crypto_context( df_imputed, col, self.crypto_profiles ) # 9. AGGRESSIVE NULL ELIMINATION for stubborn columns df_imputed = self._force_fill_stubborn_nulls(df_imputed) # 10. Sentiment features sentiment_features = [ 'social_sentiment_mean', 'social_sentiment_std', 'social_sentiment_count', 'social_confidence_mean', 'combined_sentiment', 'sentiment_agreement', 'sentiment_change_1', 'sentiment_sma_7', 'sentiment_momentum', 'sentiment_score', 'neg', 'neu', 'pos' ] for col in sentiment_features: if col in df_imputed.columns and df_imputed[col].isnull().any(): if 'sentiment' in col.lower() and 'count' not in col.lower(): # Sentiment scores - neutral with crypto-specific variation for symbol in df_imputed['symbol'].unique(): mask = df_imputed['symbol'] == symbol symbol_hash = (hash(symbol + col) % 200 / 1000) - 0.1 # -0.1 to +0.1 df_imputed.loc[mask, col] = df_imputed.loc[mask, col].fillna(symbol_hash) elif 'count' in col.lower(): df_imputed[col] = df_imputed[col].fillna(0) else: median_val = df_imputed[col].median() df_imputed[col] = df_imputed[col].fillna(median_val) # 11. Quality metrics quality_features = [ 'data_quality_score', 'core_features_completeness', 'technical_indicators_completeness', 'onchain_features_completeness', 'price_data_completeness', 'overall_feature_completeness', 'data_completeness_score' ] for col in quality_features: if col in df_imputed.columns and df_imputed[col].isnull().any(): median_val = np.clip(df_imputed[col].median(), 0, 1) # Add tiny crypto-specific variation for symbol in df_imputed['symbol'].unique(): mask = df_imputed['symbol'] == symbol symbol_hash = hash(symbol + col) % 100 / 10000 # Very small variation fill_val = np.clip(median_val + symbol_hash, 0, 1) df_imputed.loc[mask, col] = df_imputed.loc[mask, col].fillna(fill_val) # 12. Temporal features temporal_features = ['hour', 'day_of_week', 'is_weekend', 'is_trading_hours'] for col in temporal_features: if col in df_imputed.columns and df_imputed[col].isnull().any(): if col == 'hour': df_imputed[col] = df_imputed[col].fillna(12) # Default to noon elif col == 'day_of_week': df_imputed[col] = df_imputed[col].fillna(3) # Default to Wednesday elif col == 'is_weekend': df_imputed[col] = df_imputed[col].fillna(0) # Default to weekday elif col == 'is_trading_hours': df_imputed[col] = df_imputed[col].fillna(1) # Crypto trades 24/7 # 13. Handle any remaining numeric columns remaining_numeric = df_imputed.select_dtypes(include=[np.number]).columns remaining_with_nulls = [col for col in remaining_numeric if df_imputed[col].isnull().any()] for col in remaining_with_nulls: if col not in ['id', 'id_alpaca', 'backup_id'] and not col.endswith('_timestamp'): print(f"Imputing remaining column {col}...") df_imputed[col] = self._impute_with_crypto_context( df_imputed, col, self.crypto_profiles ) # 14. NUCLEAR NULL ELIMINATION - Final pass df_imputed = self._nuclear_null_elimination(df_imputed) print("[INFO] Crypto imputation complete with anti-homogenization measures") return df_imputed # Usage function with validation - FIXED VERSION def impute_crypto_with_validation_fixed(file_path, output_path=None): """Impute crypto data and validate no homogenization occurred.""" try: df = pd.read_parquet(file_path) except Exception as e: print(f"[ERROR] Failed to load file: {e}") return None # Sample symbols for validation symbols_sample = df['symbol'].unique()[:5] imputer = CryptoDataImputerFixed() df_imputed = imputer.fit_transform(df) # TRIPLE CHECK: Ensure problematic columns have no nulls problematic_cols = ['gas_used_7d_change', 'fees_7d_change', 'gas_price_7d_change'] for col in problematic_cols: if col in df_imputed.columns: null_count = df_imputed[col].isnull().sum() if null_count > 0: print(f"[EMERGENCY] Still {null_count} nulls in {col} - applying emergency fix") # Emergency symbol-specific fill for symbol in df_imputed['symbol'].unique(): symbol_mask = (df_imputed['symbol'] == symbol) & df_imputed[col].isnull() if symbol_mask.any(): symbol_hash = hash(symbol + col) % 2000 / 1000 - 1 # -1 to +1 if 'fees' in col.lower(): fill_value = symbol_hash * 0.3 elif 'gas' in col.lower(): fill_value = symbol_hash * 0.25 else: fill_value = symbol_hash * 0.2 df_imputed.loc[symbol_mask, col] = fill_value # Final nuclear option df_imputed[col] = df_imputed[col].fillna(0) print(f"[EMERGENCY] {col} nulls after emergency fix: {df_imputed[col].isnull().sum()}") # Combine alpaca data with main data if available price_cols = ['high', 'low', 'close', 'volume', 'open'] for col in price_cols: alpaca_col = f"{col}_alpaca" if col in df_imputed.columns and alpaca_col in df_imputed.columns: df_imputed[col] = df_imputed[col].combine_first(df_imputed[alpaca_col]) # Drop unwanted columns before saving drop_cols = [ '_filename', '_original_format', 'alpaca_data_available', 'ask_exchange', 'ask_exchange_alpaca', 'bid_exchange', 'bid_exchange_alpaca', 'conditions', 'conditions_alpaca', 'conditions_trade', 'conditions_trade_alpaca', 'symbol_quote', 'symbol_quote_alpaca', 'symbol_trade', 'symbol_trade_alpaca', 'tape', 'tape_alpaca', 'tape_trade', 'tape_trade_alpaca', 'id', 'id_alpaca', 'is_new_symbol', 'timestamp_dt', 'estimateCurrency', 'exchange', 'exchange_alpaca', 'exchange_company', 'finnhubIndustry', 'logo', 'ticker', 'weburl', 'latest_news_timestamp', 'volume_price_momentum', 'country', 'currency', 'ipo', 'name', 'period', 'phone', 'year', 'month', 'symbols.kraken', 'datetime', 'headline', 'blockchain_network', 'symbols.cryptocom', 'symbols.bitmart', 'symbols.kucoin', 'symbols.okx', 'symbols.coinbase','symbols.binance','symbols.mexc','symbols.bybit','symbols.bingx', 'symbols.huobi', 'symbols.bitget', 'symbols.gateio', 'interval_timestamp_dt', 'interval_timestamp_alpaca', 'interval_timestamp_trade', 'feature_timestamp', 'alpaca_merge_timestamp', 'sentiment_timestamp', 'hour', 'day_of_week', 'is_weekend', 'is_trading_hours', 'is_crypto', 'is_stock', 'is_other', 'gas_used_7d_change', 'fees_7d_change', 'gas_price_7d_change' ] # Remove alpaca columns after combining alpaca_cols = [col for col in df_imputed.columns if col.endswith('_alpaca')] drop_cols.extend(alpaca_cols) for col in drop_cols: if col in df_imputed.columns: df_imputed = df_imputed.drop(columns=col) # Reorder columns: 'symbol' first, 'interval_timestamp' second, rest follow cols = list(df_imputed.columns) if 'symbol' in cols and 'interval_timestamp' in cols: rest = [c for c in cols if c not in ['symbol', 'interval_timestamp']] df_imputed = df_imputed[['symbol', 'interval_timestamp'] + rest] # FINAL FINAL CHECK for problematic columns (after all drops/reorders) for col in problematic_cols: if col in df_imputed.columns: null_count = df_imputed[col].isnull().sum() if null_count > 0: print(f"[FINAL CHECK] Still {null_count} nulls in {col} - final nuclear fill") df_imputed[col] = df_imputed[col].fillna(0) # Validation: Check that different symbols have different values print("\n[VALIDATION] Checking for homogenization...") for symbol in symbols_sample: symbol_data = df_imputed[df_imputed['symbol'] == symbol] if len(symbol_data) > 0: price_mean = symbol_data['price'].mean() if 'price' in symbol_data.columns else 0 volume_mean = symbol_data['volume'].mean() if 'volume' in symbol_data.columns else 0 print(f" {symbol}: Price={price_mean:.2f}, Volume={volume_mean:.0f}") # Save results if output_path: # Clean up data types if 'backup_id' in df_imputed.columns: df_imputed['backup_id'] = df_imputed['backup_id'].astype(str) try: df_imputed.to_parquet(output_path, compression='snappy') print(f"[INFO] Crypto data imputed and saved to: {output_path}") except Exception as e: print(f"[ERROR] Failed to save file: {e}") # Debug: print null count, dtype, and sample after saving # for col in problematic_cols: # if col in df_imputed.columns: # print(f"[DEBUG] Nulls in {col} after save: {df_imputed[col].isnull().sum()}") # print(f"[DEBUG] Dtype for {col}: {df_imputed[col].dtype}") # print(f"[DEBUG] Sample values for {col}: {df_imputed[col].head(10).tolist()}") return df_imputed # Example usage - FIXED VERSION def main(): input_file = "data/merged/features/crypto_features.parquet" output_file = input_file df_clean = impute_crypto_with_validation_fixed(input_file, output_file) if df_clean is not None: print(f"\n[SUCCESS] Crypto data processing completed!") print(f"Final shape: {df_clean.shape}") print(f"Null values remaining: {df_clean.isnull().sum().sum()}") # Final verification of problematic columns problematic_cols = ['gas_used_7d_change', 'fees_7d_change', 'gas_price_7d_change'] for col in problematic_cols: if col in df_clean.columns: nulls = df_clean[col].isnull().sum() print(f"[FINAL VERIFICATION] {col}: {nulls} nulls") else: print("[ERROR] Failed to load or impute crypto data.") if __name__ == "__main__": main()