import pandas as pd import numpy as np from pathlib import Path import json import warnings warnings.filterwarnings('ignore') class FinalNullValueHandler: """ Advanced final null value handler with symbol-first temporal interpolation. Strategy Priority: 1. Same symbol, nearby timestamps (interpolation/extrapolation) 2. Same symbol, historical mean/median 3. Similar symbols (same asset class) 4. Global defaults with symbol-specific variation """ def __init__(self): self.crypto_column_defaults = self._define_crypto_defaults() self.stock_column_defaults = self._define_stock_defaults() self.symbol_profiles = {} self.symbol_stats = {} # Historical statistics per symbol def _analyze_symbol_statistics(self, df): """Analyze historical statistics for each symbol to guide intelligent filling""" stats = {} # Sort by timestamp for proper temporal analysis if 'interval_timestamp' in df.columns: df_sorted = df.sort_values(['symbol', 'interval_timestamp']) else: df_sorted = df.sort_values('symbol') for symbol in df['symbol'].unique(): symbol_data = df_sorted[df_sorted['symbol'] == symbol].copy() symbol_stats = { 'symbol': symbol, 'total_records': len(symbol_data), 'date_range': None, 'typical_values': {}, 'volatility': {}, 'trends': {}, 'seasonal_patterns': {} } # Calculate date range if timestamp available if 'interval_timestamp' in symbol_data.columns: timestamps = pd.to_datetime(symbol_data['interval_timestamp'], unit='ms') symbol_stats['date_range'] = { 'start': timestamps.min(), 'end': timestamps.max(), 'duration_days': (timestamps.max() - timestamps.min()).days } # Calculate typical values, volatility, and trends for numerical columns numerical_cols = symbol_data.select_dtypes(include=[np.number]).columns for col in numerical_cols: if col in ['interval_timestamp', 'backup_id']: continue col_data = symbol_data[col].dropna() if len(col_data) > 0: symbol_stats['typical_values'][col] = { 'mean': col_data.mean(), 'median': col_data.median(), 'std': col_data.std(), 'min': col_data.min(), 'max': col_data.max(), 'q25': col_data.quantile(0.25), 'q75': col_data.quantile(0.75), 'recent_mean': col_data.tail(min(10, len(col_data))).mean(), # Last 10 values 'data_points': len(col_data) } # Calculate volatility if len(col_data) > 1: symbol_stats['volatility'][col] = col_data.std() / (col_data.mean() + 1e-8) # Calculate trend if we have timestamp data if 'interval_timestamp' in symbol_data.columns and len(col_data) >= 3: # Simple linear trend valid_rows = symbol_data[col].notna() if valid_rows.sum() >= 3: x = np.arange(len(symbol_data[valid_rows])) y = symbol_data.loc[valid_rows, col].values try: trend_slope = np.polyfit(x, y, 1)[0] symbol_stats['trends'][col] = trend_slope except: symbol_stats['trends'][col] = 0 stats[symbol] = symbol_stats return stats def _temporal_interpolation_fill(self, df, symbol, column): """ Fill nulls using temporal interpolation within the same symbol Priority: 1. Linear interpolation between known values 2. Forward fill from last known value 3. Backward fill from next known value 4. Exponential smoothing for trend continuation """ try: symbol_mask = df['symbol'] == symbol symbol_data = df.loc[symbol_mask].copy() if column not in symbol_data.columns or symbol_data[column].notna().sum() == 0: return None # Sort by timestamp if available and remove duplicates if 'interval_timestamp' in symbol_data.columns: symbol_data = symbol_data.sort_values('interval_timestamp') # Drop duplicate timestamps for this symbol to avoid reindex issues symbol_data = symbol_data.drop_duplicates(subset=['interval_timestamp'], keep='first') # Reset index to avoid any index issues symbol_data = symbol_data.reset_index(drop=True) filled_series = symbol_data[column].copy() # 1. Linear interpolation (works best with timestamp ordering) if 'interval_timestamp' in symbol_data.columns and len(symbol_data) > 1: # Try time-based interpolation with safe fallback try: original_index = filled_series.index datetime_index = pd.to_datetime(symbol_data['interval_timestamp'], unit='ms') # Ensure unique datetime index if datetime_index.duplicated().any(): # Add microseconds to make unique for i, is_dup in enumerate(datetime_index.duplicated(keep='first')): if is_dup: datetime_index.iloc[i] += pd.Timedelta(microseconds=i+1) filled_series.index = datetime_index filled_series = filled_series.interpolate(method='time') filled_series.index = original_index # Restore original index except Exception: # Fallback to linear interpolation if time interpolation fails filled_series = filled_series.interpolate(method='linear') else: filled_series = filled_series.interpolate(method='linear') # 2. Forward fill filled_series = filled_series.ffill() # 3. Backward fill filled_series = filled_series.bfill() # 4. If still has nulls, use trend extrapolation if filled_series.isna().any() and symbol in self.symbol_stats: symbol_stat = self.symbol_stats[symbol] if column in symbol_stat.get('typical_values', {}): typical_val = symbol_stat['typical_values'][column]['recent_mean'] trend = symbol_stat.get('trends', {}).get(column, 0) # Apply trend-based extrapolation for remaining nulls for idx in filled_series[filled_series.isna()].index: # Simple trend continuation filled_series[idx] = typical_val + trend * (idx % 10) # Modest trend application return filled_series except Exception as e: # If all else fails, return None to trigger fallback behavior print(f"Warning: Temporal interpolation failed for {symbol} {column}: {e}") return None def _similar_symbol_fill(self, df, symbol, column, asset_type): """ Fill nulls using similar symbols in the same asset class """ if asset_type == 'crypto': # For crypto, use symbols with similar rank or market cap target_stats = self.symbol_stats.get(symbol, {}) target_rank = target_stats.get('typical_values', {}).get('rank', {}).get('median', 999) similar_symbols = [] for sym, stats in self.symbol_stats.items(): if sym == symbol: continue sym_rank = stats.get('typical_values', {}).get('rank', {}).get('median', 999) if abs(sym_rank - target_rank) <= 50: # Similar rank range similar_symbols.append(sym) else: # stock # For stocks, use symbols with similar market cap or sector target_stats = self.symbol_stats.get(symbol, {}) target_mcap = target_stats.get('typical_values', {}).get('marketCapitalization', {}).get('median', 0) similar_symbols = [] for sym, stats in self.symbol_stats.items(): if sym == symbol: continue sym_mcap = stats.get('typical_values', {}).get('marketCapitalization', {}).get('median', 0) if target_mcap > 0 and sym_mcap > 0: ratio = max(sym_mcap, target_mcap) / min(sym_mcap, target_mcap) if ratio <= 5: # Within 5x market cap similar_symbols.append(sym) if not similar_symbols: return None # Get values from similar symbols similar_data = df[df['symbol'].isin(similar_symbols)][column].dropna() if len(similar_data) > 0: # Use weighted average based on similarity return similar_data.median() # Robust central tendency return None def _intelligent_symbol_fill(self, df, symbol, column): """ Intelligent filling strategy prioritizing symbol-specific data Returns the best estimate for null values in the specified column for the given symbol """ # Strategy 1: Temporal interpolation within same symbol temporal_result = self._temporal_interpolation_fill(df, symbol, column) if temporal_result is not None and temporal_result.notna().any(): return temporal_result # Strategy 2: Use historical statistics from same symbol if symbol in self.symbol_stats and column in self.symbol_stats[symbol]['typical_values']: stats = self.symbol_stats[symbol]['typical_values'][column] # Choose appropriate central tendency based on data characteristics if stats['data_points'] >= 10: # Use recent mean for frequently updated data return stats['recent_mean'] elif stats['data_points'] >= 3: # Use median for small datasets (more robust) return stats['median'] else: # Use mean for very small datasets return stats['mean'] # Strategy 3: Use similar symbols asset_type = 'crypto' if symbol in df.columns and any( col in df.columns for col in ['rank', 'dominance', 'performance.day'] ) else 'stock' similar_fill = self._similar_symbol_fill(df, symbol, column, asset_type) if similar_fill is not None: return similar_fill # Strategy 4: Global fallback with symbol variation return None # Will be handled by existing default logic def _define_crypto_defaults(self): """Define intelligent defaults for crypto-specific columns""" return { # Crypto market data 'dominance': 0.001, # Very small dominance for minor cryptos 'rank': 999, # Low rank for unknown cryptos 'stable': 0, # Most cryptos are not stablecoins (use 0 instead of False) 'marketcap': 1000000, # $1M default market cap 'transaction_count': 100, # Minimal transaction count 'transaction_volume': 10000, # Minimal transaction volume 'tx_price_correlation': 0.5, # Neutral correlation # Exchange prices (use main price as baseline) 'exchangePrices.binance': None, # Will be filled with main price 'exchangePrices.coinbase': None, 'exchangePrices.kraken': None, 'exchangePrices.bybit': None, 'exchangePrices.kucoin': None, 'exchangePrices.okx': None, 'exchangePrices.mexc': None, 'exchangePrices.gateio': None, 'exchangePrices.bitget': None, 'exchangePrices.bitmart': None, 'exchangePrices.bingx': None, 'exchangePrices.cryptocom': None, # Exchange symbols (use main symbol as baseline) 'symbols.binance': None, # Will be filled with main symbol 'symbols.coinbase': None, 'symbols.kraken': None, 'symbols.bybit': None, 'symbols.kucoin': None, 'symbols.okx': None, 'symbols.mexc': None, 'symbols.gateio': None, 'symbols.bitget': None, 'symbols.bitmart': None, 'symbols.bingx': None, 'symbols.cryptocom': None, # Performance metrics (neutral/small changes) 'performance.day': 0.0, 'performance.hour': 0.0, 'performance.hour4': 0.0, 'performance.min1': 0.0, 'performance.min15': 0.0, 'performance.min5': 0.0, 'performance.month': 0.0, 'performance.month3': 0.0, 'performance.week': 0.0, 'performance.year': 0.0, # Rank differences (no change) 'rankDiffs.day': 0, 'rankDiffs.hour': 0, 'rankDiffs.hour4': 0, 'rankDiffs.min1': 0, 'rankDiffs.min15': 0, 'rankDiffs.min5': 0, 'rankDiffs.month': 0, 'rankDiffs.month3': 0, 'rankDiffs.week': 0, 'rankDiffs.year': 0, # Technical indicators 'bb_width': 0.02, # Small bollinger band width 'cg_id': None, # Will be derived from symbol } def _define_stock_defaults(self): """Define intelligent defaults for stock-specific columns""" return { # Stock market data 'stock_market': 'NASDAQ', # Default market 'marketCapitalization': 1000000000, # $1B default 'shareOutstanding': 100000000, # 100M shares default 'mspr': 0, # Neutral momentum # News and sentiment data 'news_activity_score_x': 0, 'news_activity_score_y': 0, 'news_articles_count_x': 0, 'news_articles_count_y': 0, 'news_highlights_count_x': 0, 'news_highlights_count_y': 0, 'news_match_score_max_x': 0, 'news_match_score_max_y': 0, 'news_match_score_mean_x': 0, 'news_match_score_mean_y': 0, 'news_mentions_count_x': 0, 'news_mentions_count_y': 0, 'news_sentiment_max_x': 0.5, # Neutral sentiment 'news_sentiment_max_y': 0.5, 'news_sentiment_mean_x': 0.5, 'news_sentiment_mean_y': 0.5, 'news_sentiment_min_x': 0.5, 'news_sentiment_min_y': 0.5, 'news_sentiment_range_x': 0, 'news_sentiment_range_y': 0, 'news_sentiment_std': 0, 'news_sentiment_std_x': 0, 'news_sentiment_std_y': 0, # Analyst ratings 'buy': 5, # Moderate buy recommendations 'hold': 10, # More hold recommendations 'sell': 2, # Few sell recommendations 'strongBuy': 3, 'strongSell': 1, # Technical indicators 'volume_price_momentum': 0.0, # Neutral momentum } def _create_symbol_profiles(self, df): """Create profiles for each symbol to guide intelligent filling""" profiles = {} for symbol in df['symbol'].unique(): symbol_data = df[df['symbol'] == symbol] # Determine if it's crypto or stock is_crypto = 'rank' in symbol_data.columns and symbol_data['rank'].notna().any() if not is_crypto: is_crypto = any(col.startswith('performance.') for col in symbol_data.columns) # Calculate key statistics profile = { 'symbol': symbol, 'is_crypto': is_crypto, 'total_records': len(symbol_data), 'data_density': symbol_data.notna().mean().mean(), 'has_price_data': 'price' in symbol_data.columns and symbol_data['price'].notna().any(), 'typical_price': symbol_data.get('price', pd.Series([100])).median(), 'typical_volume': symbol_data.get('volume', pd.Series([1000000])).median(), 'typical_marketcap': symbol_data.get('marketcap', symbol_data.get('marketCapitalization', pd.Series([1000000000]))).median() } profiles[symbol] = profile return profiles def _intelligent_fill_value(self, df, symbol, column, default_value): """Generate intelligent fill value based on symbol context""" profile = self.symbol_profiles.get(symbol, {}) # Add symbol-specific variation to prevent homogenization symbol_hash = hash(f"{symbol}_{column}") % 1000 variation_factor = (symbol_hash / 1000.0 - 0.5) * 0.1 # ±5% variation if default_value is None: return None elif isinstance(default_value, (int, float)): if default_value == 0: return 0 # Keep zeros as zeros else: return default_value * (1 + variation_factor) else: return default_value def _fill_exchange_prices_advanced(self, df): """Advanced exchange price filling using symbol-first strategy""" exchange_price_cols = [col for col in df.columns if col.startswith('exchangePrices.')] if not exchange_price_cols or 'price' not in df.columns: return df df_result = df.copy() for symbol in df['symbol'].unique(): symbol_mask = df['symbol'] == symbol symbol_data = df.loc[symbol_mask] # First try to get main price from symbol's own data main_price_series = self._intelligent_symbol_fill(df, symbol, 'price') if main_price_series is None or (isinstance(main_price_series, pd.Series) and main_price_series.isna().all()): continue if isinstance(main_price_series, pd.Series): main_price = main_price_series.median() else: main_price = main_price_series if pd.isna(main_price): continue # Fill exchange prices for this symbol for exchange_col in exchange_price_cols: if symbol_data[exchange_col].isna().any(): # First try temporal interpolation for this exchange exchange_filled = self._intelligent_symbol_fill(df, symbol, exchange_col) if exchange_filled is not None: if isinstance(exchange_filled, pd.Series): df_result.loc[symbol_mask, exchange_col] = exchange_filled else: null_mask = df_result.loc[symbol_mask, exchange_col].isna() df_result.loc[symbol_mask & null_mask, exchange_col] = exchange_filled else: # Fallback: use main price with small exchange-specific variation exchange_hash = hash(f"{symbol}_{exchange_col}") % 100 variation = (exchange_hash / 100.0 - 0.5) * 0.01 # ±0.5% exchange_price = main_price * (1 + variation) null_mask = df_result.loc[symbol_mask, exchange_col].isna() df_result.loc[symbol_mask & null_mask, exchange_col] = exchange_price return df_result def _fill_exchange_symbols(self, df): """Fill exchange symbols with main symbol + exchange-specific formatting""" exchange_symbol_cols = [col for col in df.columns if col.startswith('symbols.')] if not exchange_symbol_cols or 'symbol' not in df.columns: return df df_result = df.copy() # Exchange-specific symbol formatting exchange_formats = { 'symbols.binance': lambda s: f"{s.upper()}USDT" if s.lower() != 'bitcoin' else "BTCUSDT", 'symbols.coinbase': lambda s: f"{s.upper()}-USD", 'symbols.kraken': lambda s: f"{s.upper()}USD" if len(s) <= 3 else f"{s.upper()}/USD", 'symbols.bybit': lambda s: f"{s.upper()}USDT", 'symbols.kucoin': lambda s: f"{s.upper()}-USDT", 'symbols.okx': lambda s: f"{s.upper()}-USDT", 'symbols.mexc': lambda s: f"{s.upper()}_USDT", 'symbols.gateio': lambda s: f"{s.upper()}_USDT", 'symbols.bitget': lambda s: f"{s.upper()}USDT", 'symbols.bitmart': lambda s: f"{s.upper()}_USDT", 'symbols.bingx': lambda s: f"{s.upper()}-USDT", 'symbols.cryptocom': lambda s: f"{s.upper()}_USDT" } for symbol in df['symbol'].unique(): symbol_mask = df['symbol'] == symbol for exchange_col in exchange_symbol_cols: if df.loc[symbol_mask, exchange_col].isna().all(): formatter = exchange_formats.get(exchange_col, lambda s: s.upper()) try: exchange_symbol = formatter(symbol) df_result.loc[symbol_mask, exchange_col] = exchange_symbol except Exception: df_result.loc[symbol_mask, exchange_col] = symbol.upper() return df_result def _fill_cg_id(self, df): """Fill CoinGecko ID based on symbol""" if 'cg_id' not in df.columns: return df df_result = df.copy() # Common CoinGecko ID mappings cg_id_mapping = { 'bitcoin': 'bitcoin', 'btc': 'bitcoin', 'ethereum': 'ethereum', 'eth': 'ethereum', 'binancecoin': 'binancecoin', 'bnb': 'binancecoin', 'cardano': 'cardano', 'ada': 'cardano', 'solana': 'solana', 'sol': 'solana', 'xrp': 'ripple', 'ripple': 'ripple', 'dogecoin': 'dogecoin', 'doge': 'dogecoin', 'polkadot': 'polkadot', 'dot': 'polkadot', 'avalanche-2': 'avalanche-2', 'avax': 'avalanche-2', 'chainlink': 'chainlink', 'link': 'chainlink', 'polygon': 'matic-network', 'matic': 'matic-network', 'litecoin': 'litecoin', 'ltc': 'litecoin', 'uniswap': 'uniswap', 'uni': 'uniswap' } for symbol in df['symbol'].unique(): symbol_mask = df['symbol'] == symbol if df.loc[symbol_mask, 'cg_id'].isna().all(): cg_id = cg_id_mapping.get(symbol.lower(), symbol.lower()) df_result.loc[symbol_mask, 'cg_id'] = cg_id return df_result def process_crypto_features(self, df): """Process crypto features with advanced symbol-first null handling""" print("Processing crypto features with symbol-first strategy...") df_result = df.copy() # Step 1: Analyze symbol statistics for intelligent filling print("Analyzing symbol statistics...") self.symbol_stats = self._analyze_symbol_statistics(df_result) print(f"Analyzed {len(self.symbol_stats)} symbols") # Step 2: Create symbol profiles self.symbol_profiles = self._create_symbol_profiles(df_result) # Step 3: Symbol-first null handling for key columns priority_columns = [ 'price', 'volume', 'marketcap', 'dominance', 'rank', 'performance.day', 'performance.week', 'performance.month', 'rsi', 'macd', 'transaction_count', 'transaction_volume' ] for column in priority_columns: if column in df_result.columns and df_result[column].isna().any(): print(f"Processing {column} with symbol-first strategy...") for symbol in df_result['symbol'].unique(): symbol_mask = df_result['symbol'] == symbol null_mask = df_result[column].isna() fill_mask = symbol_mask & null_mask if fill_mask.any(): # Use intelligent symbol-first filling fill_result = self._intelligent_symbol_fill(df_result, symbol, column) if fill_result is not None: if isinstance(fill_result, pd.Series): # If we got a series back (from temporal interpolation) # Make sure the series aligns with the symbol mask symbol_indices = df_result[symbol_mask].index if len(fill_result) == len(symbol_indices): # Map the series values to the correct indices for i, idx in enumerate(symbol_indices): if pd.notna(fill_result.iloc[i]): df_result.loc[idx, column] = fill_result.iloc[i] else: # Fallback: use median of the series fill_value = fill_result.median() if pd.notna(fill_value): df_result.loc[fill_mask, column] = fill_value else: # If we got a scalar value df_result.loc[fill_mask, column] = fill_result # Step 4: Handle exchange prices with cross-reference to main price df_result = self._fill_exchange_prices_advanced(df_result) # Step 5: Handle exchange symbols with proper formatting df_result = self._fill_exchange_symbols(df_result) # Step 6: Handle CoinGecko IDs df_result = self._fill_cg_id(df_result) # Step 7: Fill remaining columns with intelligent defaults for column in df_result.columns: if df_result[column].isna().any(): default_value = self.crypto_column_defaults.get(column) if default_value is not None: for symbol in df_result['symbol'].unique(): symbol_mask = df_result['symbol'] == symbol null_mask = df_result[column].isna() fill_mask = symbol_mask & null_mask if fill_mask.any(): try: fill_value = self._intelligent_fill_value( df_result, symbol, column, default_value ) df_result.loc[fill_mask, column] = fill_value except Exception as e: print(f"Warning: Failed to fill {column} for {symbol}: {e}") # Skip this column for this symbol continue return df_result def process_stock_features(self, df): """Process stock features with advanced symbol-first null handling""" print("Processing stock features with symbol-first strategy...") df_result = df.copy() # Step 1: Analyze symbol statistics for intelligent filling print("Analyzing symbol statistics...") self.symbol_stats = self._analyze_symbol_statistics(df_result) print(f"Analyzed {len(self.symbol_stats)} symbols") # Step 2: Create symbol profiles self.symbol_profiles = self._create_symbol_profiles(df_result) # Step 3: Symbol-first null handling for key columns priority_columns = [ 'close', 'open', 'high', 'low', 'volume', 'prev_close', 'marketCapitalization', 'shareOutstanding', 'rsi', 'macd', 'atr', 'bb_position', 'news_sentiment_mean_x', 'news_sentiment_mean_y', 'buy', 'sell', 'hold', 'strongBuy', 'strongSell' ] for column in priority_columns: if column in df_result.columns and df_result[column].isna().any(): print(f"Processing {column} with symbol-first strategy...") for symbol in df_result['symbol'].unique(): symbol_mask = df_result['symbol'] == symbol null_mask = df_result[column].isna() fill_mask = symbol_mask & null_mask if fill_mask.any(): # Use intelligent symbol-first filling fill_result = self._intelligent_symbol_fill(df_result, symbol, column) if fill_result is not None: if isinstance(fill_result, pd.Series): # If we got a series back (from temporal interpolation) # Make sure the series aligns with the symbol mask symbol_indices = df_result[symbol_mask].index if len(fill_result) == len(symbol_indices): # Map the series values to the correct indices for i, idx in enumerate(symbol_indices): if pd.notna(fill_result.iloc[i]): df_result.loc[idx, column] = fill_result.iloc[i] else: # Fallback: use median of the series fill_value = fill_result.median() if pd.notna(fill_value): df_result.loc[fill_mask, column] = fill_value else: # If we got a scalar value df_result.loc[fill_mask, column] = fill_result # Step 4: Fill remaining columns with intelligent defaults for column in df_result.columns: if df_result[column].isna().any(): default_value = self.stock_column_defaults.get(column) if default_value is not None: for symbol in df_result['symbol'].unique(): symbol_mask = df_result['symbol'] == symbol null_mask = df_result[column].isna() fill_mask = symbol_mask & null_mask if fill_mask.any(): try: fill_value = self._intelligent_fill_value( df_result, symbol, column, default_value ) df_result.loc[fill_mask, column] = fill_value except Exception as e: print(f"Warning: Failed to fill {column} for {symbol}: {e}") # Skip this column for this symbol continue return df_result def generate_report(self, df_before, df_after, feature_type): """Generate a comprehensive report of null value handling with symbol-first strategy details""" before_nulls = df_before.isnull().sum() after_nulls = df_after.isnull().sum() null_reduction = before_nulls - after_nulls columns_fixed = null_reduction[null_reduction > 0] # Analyze symbol coverage symbol_analysis = {} if 'symbol' in df_before.columns: for symbol in df_before['symbol'].unique(): symbol_before = int(df_before[df_before['symbol'] == symbol].isnull().sum().sum()) symbol_after = int(df_after[df_after['symbol'] == symbol].isnull().sum().sum()) symbol_analysis[symbol] = { 'nulls_before': symbol_before, 'nulls_after': symbol_after, 'nulls_filled': symbol_before - symbol_after, 'records': int(len(df_before[df_before['symbol'] == symbol])) } # Analyze temporal coverage if timestamp available temporal_analysis = {} if 'interval_timestamp' in df_before.columns: df_before_ts = df_before.copy() df_after_ts = df_after.copy() df_before_ts['date'] = pd.to_datetime(df_before_ts['interval_timestamp'], unit='ms').dt.date df_after_ts['date'] = pd.to_datetime(df_after_ts['interval_timestamp'], unit='ms').dt.date for date in df_before_ts['date'].unique(): date_before = int(df_before_ts[df_before_ts['date'] == date].isnull().sum().sum()) date_after = int(df_after_ts[df_after_ts['date'] == date].isnull().sum().sum()) temporal_analysis[str(date)] = { 'nulls_before': date_before, 'nulls_after': date_after, 'nulls_filled': date_before - date_after } report = { 'feature_type': feature_type, 'timestamp': pd.Timestamp.now().isoformat(), 'strategy': 'symbol-first-temporal-interpolation', 'total_rows': int(len(df_after)), 'total_columns': int(len(df_after.columns)), 'unique_symbols': int(len(df_after['symbol'].unique())) if 'symbol' in df_after.columns else 0, 'columns_with_nulls_before': int((before_nulls > 0).sum()), 'columns_with_nulls_after': int((after_nulls > 0).sum()), 'total_nulls_before': int(before_nulls.sum()), 'total_nulls_after': int(after_nulls.sum()), 'total_nulls_filled': int(null_reduction.sum()), 'columns_fixed': int(len(columns_fixed)), 'null_reduction_rate': float((null_reduction.sum() / before_nulls.sum()) if before_nulls.sum() > 0 else 0), 'remaining_null_columns': {str(k): int(v) for k, v in after_nulls[after_nulls > 0].to_dict().items()}, 'fixed_columns_detail': {str(k): int(v) for k, v in null_reduction[null_reduction > 0].to_dict().items()}, 'symbol_analysis': symbol_analysis, 'temporal_analysis': temporal_analysis, 'strategy_details': { 'symbol_stats_analyzed': len(self.symbol_stats), 'temporal_interpolation_used': True, 'similar_symbol_fallback': True, 'intelligent_defaults': True } } return report def process_crypto_features_file(input_path, output_path=None): """Process crypto features file""" if output_path is None: output_path = input_path print(f"Loading crypto features from {input_path}...") df = pd.read_parquet(input_path) print(f"Loaded {len(df)} rows with {len(df.columns)} columns") print(f"Null values before processing: {df.isnull().sum().sum()}") handler = FinalNullValueHandler() df_processed = handler.process_crypto_features(df) print(f"Null values after processing: {df_processed.isnull().sum().sum()}") # Generate report report = handler.generate_report(df, df_processed, 'crypto') # Save processed data df_processed.to_parquet(output_path, index=False) print(f"Saved processed crypto features to {output_path}") # Save report report_path = str(output_path).replace('.parquet', '_null_handling_report.json') with open(report_path, 'w') as f: json.dump(report, f, indent=2) print(f"Saved processing report to {report_path}") return df_processed, report def process_stock_features_file(input_path, output_path=None): """Process stock features file""" if output_path is None: output_path = input_path print(f"Loading stock features from {input_path}...") df = pd.read_parquet(input_path) print(f"Loaded {len(df)} rows with {len(df.columns)} columns") print(f"Null values before processing: {df.isnull().sum().sum()}") handler = FinalNullValueHandler() df_processed = handler.process_stock_features(df) print(f"Null values after processing: {df_processed.isnull().sum().sum()}") # Generate report report = handler.generate_report(df, df_processed, 'stock') # Save processed data df_processed.to_parquet(output_path, index=False) print(f"Saved processed stock features to {output_path}") # Save report report_path = str(output_path).replace('.parquet', '_null_handling_report.json') with open(report_path, 'w') as f: json.dump(report, f, indent=2) print(f"Saved processing report to {report_path}") return df_processed, report def main(): """Main function to process both crypto and stock features""" crypto_path = Path("data/merged/features/crypto_features.parquet") stocks_path = Path("data/merged/features/stocks_features.parquet") processed_files = [] # Process crypto features if crypto_path.exists(): try: df_crypto, report_crypto = process_crypto_features_file(crypto_path) processed_files.append(('crypto', crypto_path, report_crypto)) print(f"✓ Crypto features processed: {report_crypto['total_nulls_filled']} nulls filled") except Exception as e: print(f"✗ Error processing crypto features: {e}") else: print(f"Warning: {crypto_path} not found") # Process stock features if stocks_path.exists(): try: df_stocks, report_stocks = process_stock_features_file(stocks_path) processed_files.append(('stocks', stocks_path, report_stocks)) print(f"✓ Stock features processed: {report_stocks['total_nulls_filled']} nulls filled") except Exception as e: print(f"✗ Error processing stock features: {e}") else: print(f"Warning: {stocks_path} not found") # Summary report if processed_files: print("\n" + "="*60) print("FINAL NULL VALUE HANDLING SUMMARY") print("="*60) total_nulls_filled = 0 for file_type, file_path, report in processed_files: total_nulls_filled += report['total_nulls_filled'] print(f"\n{file_type.upper()} FEATURES:") print(f" File: {file_path}") print(f" Rows: {report['total_rows']:,}") print(f" Columns: {report['total_columns']}") print(f" Nulls filled: {report['total_nulls_filled']:,}") print(f" Columns fixed: {report['columns_fixed']}") print(f" Remaining null columns: {len(report['remaining_null_columns'])}") if report['remaining_null_columns']: print(f" Still have nulls: {list(report['remaining_null_columns'].keys())}") print(f"\nTOTAL NULLS FILLED ACROSS ALL FILES: {total_nulls_filled:,}") print("="*60) else: print("No files were processed successfully.") if __name__ == "__main__": main()