|
import pandas as pd |
|
import numpy as np |
|
from pathlib import Path |
|
import json |
|
import warnings |
|
warnings.filterwarnings('ignore') |
|
|
|
class FinalNullValueHandler: |
|
""" |
|
Advanced final null value handler with symbol-first temporal interpolation. |
|
|
|
Strategy Priority: |
|
1. Same symbol, nearby timestamps (interpolation/extrapolation) |
|
2. Same symbol, historical mean/median |
|
3. Similar symbols (same asset class) |
|
4. Global defaults with symbol-specific variation |
|
""" |
|
|
|
def __init__(self): |
|
self.crypto_column_defaults = self._define_crypto_defaults() |
|
self.stock_column_defaults = self._define_stock_defaults() |
|
self.symbol_profiles = {} |
|
self.symbol_stats = {} |
|
|
|
def _analyze_symbol_statistics(self, df): |
|
"""Analyze historical statistics for each symbol to guide intelligent filling""" |
|
stats = {} |
|
|
|
|
|
if 'interval_timestamp' in df.columns: |
|
df_sorted = df.sort_values(['symbol', 'interval_timestamp']) |
|
else: |
|
df_sorted = df.sort_values('symbol') |
|
|
|
for symbol in df['symbol'].unique(): |
|
symbol_data = df_sorted[df_sorted['symbol'] == symbol].copy() |
|
|
|
symbol_stats = { |
|
'symbol': symbol, |
|
'total_records': len(symbol_data), |
|
'date_range': None, |
|
'typical_values': {}, |
|
'volatility': {}, |
|
'trends': {}, |
|
'seasonal_patterns': {} |
|
} |
|
|
|
|
|
if 'interval_timestamp' in symbol_data.columns: |
|
timestamps = pd.to_datetime(symbol_data['interval_timestamp'], unit='ms') |
|
symbol_stats['date_range'] = { |
|
'start': timestamps.min(), |
|
'end': timestamps.max(), |
|
'duration_days': (timestamps.max() - timestamps.min()).days |
|
} |
|
|
|
|
|
numerical_cols = symbol_data.select_dtypes(include=[np.number]).columns |
|
for col in numerical_cols: |
|
if col in ['interval_timestamp', 'backup_id']: |
|
continue |
|
|
|
col_data = symbol_data[col].dropna() |
|
if len(col_data) > 0: |
|
symbol_stats['typical_values'][col] = { |
|
'mean': col_data.mean(), |
|
'median': col_data.median(), |
|
'std': col_data.std(), |
|
'min': col_data.min(), |
|
'max': col_data.max(), |
|
'q25': col_data.quantile(0.25), |
|
'q75': col_data.quantile(0.75), |
|
'recent_mean': col_data.tail(min(10, len(col_data))).mean(), |
|
'data_points': len(col_data) |
|
} |
|
|
|
|
|
if len(col_data) > 1: |
|
symbol_stats['volatility'][col] = col_data.std() / (col_data.mean() + 1e-8) |
|
|
|
|
|
if 'interval_timestamp' in symbol_data.columns and len(col_data) >= 3: |
|
|
|
valid_rows = symbol_data[col].notna() |
|
if valid_rows.sum() >= 3: |
|
x = np.arange(len(symbol_data[valid_rows])) |
|
y = symbol_data.loc[valid_rows, col].values |
|
try: |
|
trend_slope = np.polyfit(x, y, 1)[0] |
|
symbol_stats['trends'][col] = trend_slope |
|
except: |
|
symbol_stats['trends'][col] = 0 |
|
|
|
stats[symbol] = symbol_stats |
|
|
|
return stats |
|
|
|
def _temporal_interpolation_fill(self, df, symbol, column): |
|
""" |
|
Fill nulls using temporal interpolation within the same symbol |
|
|
|
Priority: |
|
1. Linear interpolation between known values |
|
2. Forward fill from last known value |
|
3. Backward fill from next known value |
|
4. Exponential smoothing for trend continuation |
|
""" |
|
try: |
|
symbol_mask = df['symbol'] == symbol |
|
symbol_data = df.loc[symbol_mask].copy() |
|
|
|
if column not in symbol_data.columns or symbol_data[column].notna().sum() == 0: |
|
return None |
|
|
|
|
|
if 'interval_timestamp' in symbol_data.columns: |
|
symbol_data = symbol_data.sort_values('interval_timestamp') |
|
|
|
symbol_data = symbol_data.drop_duplicates(subset=['interval_timestamp'], keep='first') |
|
|
|
|
|
symbol_data = symbol_data.reset_index(drop=True) |
|
filled_series = symbol_data[column].copy() |
|
|
|
|
|
if 'interval_timestamp' in symbol_data.columns and len(symbol_data) > 1: |
|
|
|
try: |
|
original_index = filled_series.index |
|
datetime_index = pd.to_datetime(symbol_data['interval_timestamp'], unit='ms') |
|
|
|
|
|
if datetime_index.duplicated().any(): |
|
|
|
for i, is_dup in enumerate(datetime_index.duplicated(keep='first')): |
|
if is_dup: |
|
datetime_index.iloc[i] += pd.Timedelta(microseconds=i+1) |
|
|
|
filled_series.index = datetime_index |
|
filled_series = filled_series.interpolate(method='time') |
|
filled_series.index = original_index |
|
except Exception: |
|
|
|
filled_series = filled_series.interpolate(method='linear') |
|
else: |
|
filled_series = filled_series.interpolate(method='linear') |
|
|
|
|
|
filled_series = filled_series.ffill() |
|
|
|
|
|
filled_series = filled_series.bfill() |
|
|
|
|
|
if filled_series.isna().any() and symbol in self.symbol_stats: |
|
symbol_stat = self.symbol_stats[symbol] |
|
if column in symbol_stat.get('typical_values', {}): |
|
typical_val = symbol_stat['typical_values'][column]['recent_mean'] |
|
trend = symbol_stat.get('trends', {}).get(column, 0) |
|
|
|
|
|
for idx in filled_series[filled_series.isna()].index: |
|
|
|
filled_series[idx] = typical_val + trend * (idx % 10) |
|
|
|
return filled_series |
|
|
|
except Exception as e: |
|
|
|
print(f"Warning: Temporal interpolation failed for {symbol} {column}: {e}") |
|
return None |
|
|
|
def _similar_symbol_fill(self, df, symbol, column, asset_type): |
|
""" |
|
Fill nulls using similar symbols in the same asset class |
|
""" |
|
if asset_type == 'crypto': |
|
|
|
target_stats = self.symbol_stats.get(symbol, {}) |
|
target_rank = target_stats.get('typical_values', {}).get('rank', {}).get('median', 999) |
|
|
|
similar_symbols = [] |
|
for sym, stats in self.symbol_stats.items(): |
|
if sym == symbol: |
|
continue |
|
|
|
sym_rank = stats.get('typical_values', {}).get('rank', {}).get('median', 999) |
|
if abs(sym_rank - target_rank) <= 50: |
|
similar_symbols.append(sym) |
|
|
|
else: |
|
|
|
target_stats = self.symbol_stats.get(symbol, {}) |
|
target_mcap = target_stats.get('typical_values', {}).get('marketCapitalization', {}).get('median', 0) |
|
|
|
similar_symbols = [] |
|
for sym, stats in self.symbol_stats.items(): |
|
if sym == symbol: |
|
continue |
|
|
|
sym_mcap = stats.get('typical_values', {}).get('marketCapitalization', {}).get('median', 0) |
|
if target_mcap > 0 and sym_mcap > 0: |
|
ratio = max(sym_mcap, target_mcap) / min(sym_mcap, target_mcap) |
|
if ratio <= 5: |
|
similar_symbols.append(sym) |
|
|
|
if not similar_symbols: |
|
return None |
|
|
|
|
|
similar_data = df[df['symbol'].isin(similar_symbols)][column].dropna() |
|
if len(similar_data) > 0: |
|
|
|
return similar_data.median() |
|
|
|
return None |
|
|
|
def _intelligent_symbol_fill(self, df, symbol, column): |
|
""" |
|
Intelligent filling strategy prioritizing symbol-specific data |
|
|
|
Returns the best estimate for null values in the specified column for the given symbol |
|
""" |
|
|
|
temporal_result = self._temporal_interpolation_fill(df, symbol, column) |
|
if temporal_result is not None and temporal_result.notna().any(): |
|
return temporal_result |
|
|
|
|
|
if symbol in self.symbol_stats and column in self.symbol_stats[symbol]['typical_values']: |
|
stats = self.symbol_stats[symbol]['typical_values'][column] |
|
|
|
|
|
if stats['data_points'] >= 10: |
|
|
|
return stats['recent_mean'] |
|
elif stats['data_points'] >= 3: |
|
|
|
return stats['median'] |
|
else: |
|
|
|
return stats['mean'] |
|
|
|
|
|
asset_type = 'crypto' if symbol in df.columns and any( |
|
col in df.columns for col in ['rank', 'dominance', 'performance.day'] |
|
) else 'stock' |
|
|
|
similar_fill = self._similar_symbol_fill(df, symbol, column, asset_type) |
|
if similar_fill is not None: |
|
return similar_fill |
|
|
|
|
|
return None |
|
|
|
def _define_crypto_defaults(self): |
|
"""Define intelligent defaults for crypto-specific columns""" |
|
return { |
|
|
|
'dominance': 0.001, |
|
'rank': 999, |
|
'stable': 0, |
|
'marketcap': 1000000, |
|
'transaction_count': 100, |
|
'transaction_volume': 10000, |
|
'tx_price_correlation': 0.5, |
|
|
|
|
|
'exchangePrices.binance': None, |
|
'exchangePrices.coinbase': None, |
|
'exchangePrices.kraken': None, |
|
'exchangePrices.bybit': None, |
|
'exchangePrices.kucoin': None, |
|
'exchangePrices.okx': None, |
|
'exchangePrices.mexc': None, |
|
'exchangePrices.gateio': None, |
|
'exchangePrices.bitget': None, |
|
'exchangePrices.bitmart': None, |
|
'exchangePrices.bingx': None, |
|
'exchangePrices.cryptocom': None, |
|
|
|
|
|
'symbols.binance': None, |
|
'symbols.coinbase': None, |
|
'symbols.kraken': None, |
|
'symbols.bybit': None, |
|
'symbols.kucoin': None, |
|
'symbols.okx': None, |
|
'symbols.mexc': None, |
|
'symbols.gateio': None, |
|
'symbols.bitget': None, |
|
'symbols.bitmart': None, |
|
'symbols.bingx': None, |
|
'symbols.cryptocom': None, |
|
|
|
|
|
'performance.day': 0.0, |
|
'performance.hour': 0.0, |
|
'performance.hour4': 0.0, |
|
'performance.min1': 0.0, |
|
'performance.min15': 0.0, |
|
'performance.min5': 0.0, |
|
'performance.month': 0.0, |
|
'performance.month3': 0.0, |
|
'performance.week': 0.0, |
|
'performance.year': 0.0, |
|
|
|
|
|
'rankDiffs.day': 0, |
|
'rankDiffs.hour': 0, |
|
'rankDiffs.hour4': 0, |
|
'rankDiffs.min1': 0, |
|
'rankDiffs.min15': 0, |
|
'rankDiffs.min5': 0, |
|
'rankDiffs.month': 0, |
|
'rankDiffs.month3': 0, |
|
'rankDiffs.week': 0, |
|
'rankDiffs.year': 0, |
|
|
|
|
|
'bb_width': 0.02, |
|
'cg_id': None, |
|
} |
|
|
|
def _define_stock_defaults(self): |
|
"""Define intelligent defaults for stock-specific columns""" |
|
return { |
|
|
|
'stock_market': 'NASDAQ', |
|
'marketCapitalization': 1000000000, |
|
'shareOutstanding': 100000000, |
|
'mspr': 0, |
|
|
|
|
|
'news_activity_score_x': 0, |
|
'news_activity_score_y': 0, |
|
'news_articles_count_x': 0, |
|
'news_articles_count_y': 0, |
|
'news_highlights_count_x': 0, |
|
'news_highlights_count_y': 0, |
|
'news_match_score_max_x': 0, |
|
'news_match_score_max_y': 0, |
|
'news_match_score_mean_x': 0, |
|
'news_match_score_mean_y': 0, |
|
'news_mentions_count_x': 0, |
|
'news_mentions_count_y': 0, |
|
'news_sentiment_max_x': 0.5, |
|
'news_sentiment_max_y': 0.5, |
|
'news_sentiment_mean_x': 0.5, |
|
'news_sentiment_mean_y': 0.5, |
|
'news_sentiment_min_x': 0.5, |
|
'news_sentiment_min_y': 0.5, |
|
'news_sentiment_range_x': 0, |
|
'news_sentiment_range_y': 0, |
|
'news_sentiment_std': 0, |
|
'news_sentiment_std_x': 0, |
|
'news_sentiment_std_y': 0, |
|
|
|
|
|
'buy': 5, |
|
'hold': 10, |
|
'sell': 2, |
|
'strongBuy': 3, |
|
'strongSell': 1, |
|
|
|
|
|
'volume_price_momentum': 0.0, |
|
} |
|
|
|
def _create_symbol_profiles(self, df): |
|
"""Create profiles for each symbol to guide intelligent filling""" |
|
profiles = {} |
|
|
|
for symbol in df['symbol'].unique(): |
|
symbol_data = df[df['symbol'] == symbol] |
|
|
|
|
|
is_crypto = 'rank' in symbol_data.columns and symbol_data['rank'].notna().any() |
|
if not is_crypto: |
|
is_crypto = any(col.startswith('performance.') for col in symbol_data.columns) |
|
|
|
|
|
profile = { |
|
'symbol': symbol, |
|
'is_crypto': is_crypto, |
|
'total_records': len(symbol_data), |
|
'data_density': symbol_data.notna().mean().mean(), |
|
'has_price_data': 'price' in symbol_data.columns and symbol_data['price'].notna().any(), |
|
'typical_price': symbol_data.get('price', pd.Series([100])).median(), |
|
'typical_volume': symbol_data.get('volume', pd.Series([1000000])).median(), |
|
'typical_marketcap': symbol_data.get('marketcap', symbol_data.get('marketCapitalization', pd.Series([1000000000]))).median() |
|
} |
|
|
|
profiles[symbol] = profile |
|
|
|
return profiles |
|
|
|
def _intelligent_fill_value(self, df, symbol, column, default_value): |
|
"""Generate intelligent fill value based on symbol context""" |
|
profile = self.symbol_profiles.get(symbol, {}) |
|
|
|
|
|
symbol_hash = hash(f"{symbol}_{column}") % 1000 |
|
variation_factor = (symbol_hash / 1000.0 - 0.5) * 0.1 |
|
|
|
if default_value is None: |
|
return None |
|
elif isinstance(default_value, (int, float)): |
|
if default_value == 0: |
|
return 0 |
|
else: |
|
return default_value * (1 + variation_factor) |
|
else: |
|
return default_value |
|
|
|
def _fill_exchange_prices_advanced(self, df): |
|
"""Advanced exchange price filling using symbol-first strategy""" |
|
exchange_price_cols = [col for col in df.columns if col.startswith('exchangePrices.')] |
|
|
|
if not exchange_price_cols or 'price' not in df.columns: |
|
return df |
|
|
|
df_result = df.copy() |
|
|
|
for symbol in df['symbol'].unique(): |
|
symbol_mask = df['symbol'] == symbol |
|
symbol_data = df.loc[symbol_mask] |
|
|
|
|
|
main_price_series = self._intelligent_symbol_fill(df, symbol, 'price') |
|
if main_price_series is None or (isinstance(main_price_series, pd.Series) and main_price_series.isna().all()): |
|
continue |
|
|
|
if isinstance(main_price_series, pd.Series): |
|
main_price = main_price_series.median() |
|
else: |
|
main_price = main_price_series |
|
|
|
if pd.isna(main_price): |
|
continue |
|
|
|
|
|
for exchange_col in exchange_price_cols: |
|
if symbol_data[exchange_col].isna().any(): |
|
|
|
exchange_filled = self._intelligent_symbol_fill(df, symbol, exchange_col) |
|
|
|
if exchange_filled is not None: |
|
if isinstance(exchange_filled, pd.Series): |
|
df_result.loc[symbol_mask, exchange_col] = exchange_filled |
|
else: |
|
null_mask = df_result.loc[symbol_mask, exchange_col].isna() |
|
df_result.loc[symbol_mask & null_mask, exchange_col] = exchange_filled |
|
else: |
|
|
|
exchange_hash = hash(f"{symbol}_{exchange_col}") % 100 |
|
variation = (exchange_hash / 100.0 - 0.5) * 0.01 |
|
exchange_price = main_price * (1 + variation) |
|
null_mask = df_result.loc[symbol_mask, exchange_col].isna() |
|
df_result.loc[symbol_mask & null_mask, exchange_col] = exchange_price |
|
|
|
return df_result |
|
|
|
def _fill_exchange_symbols(self, df): |
|
"""Fill exchange symbols with main symbol + exchange-specific formatting""" |
|
exchange_symbol_cols = [col for col in df.columns if col.startswith('symbols.')] |
|
|
|
if not exchange_symbol_cols or 'symbol' not in df.columns: |
|
return df |
|
|
|
df_result = df.copy() |
|
|
|
|
|
exchange_formats = { |
|
'symbols.binance': lambda s: f"{s.upper()}USDT" if s.lower() != 'bitcoin' else "BTCUSDT", |
|
'symbols.coinbase': lambda s: f"{s.upper()}-USD", |
|
'symbols.kraken': lambda s: f"{s.upper()}USD" if len(s) <= 3 else f"{s.upper()}/USD", |
|
'symbols.bybit': lambda s: f"{s.upper()}USDT", |
|
'symbols.kucoin': lambda s: f"{s.upper()}-USDT", |
|
'symbols.okx': lambda s: f"{s.upper()}-USDT", |
|
'symbols.mexc': lambda s: f"{s.upper()}_USDT", |
|
'symbols.gateio': lambda s: f"{s.upper()}_USDT", |
|
'symbols.bitget': lambda s: f"{s.upper()}USDT", |
|
'symbols.bitmart': lambda s: f"{s.upper()}_USDT", |
|
'symbols.bingx': lambda s: f"{s.upper()}-USDT", |
|
'symbols.cryptocom': lambda s: f"{s.upper()}_USDT" |
|
} |
|
|
|
for symbol in df['symbol'].unique(): |
|
symbol_mask = df['symbol'] == symbol |
|
|
|
for exchange_col in exchange_symbol_cols: |
|
if df.loc[symbol_mask, exchange_col].isna().all(): |
|
formatter = exchange_formats.get(exchange_col, lambda s: s.upper()) |
|
try: |
|
exchange_symbol = formatter(symbol) |
|
df_result.loc[symbol_mask, exchange_col] = exchange_symbol |
|
except Exception: |
|
df_result.loc[symbol_mask, exchange_col] = symbol.upper() |
|
|
|
return df_result |
|
|
|
def _fill_cg_id(self, df): |
|
"""Fill CoinGecko ID based on symbol""" |
|
if 'cg_id' not in df.columns: |
|
return df |
|
|
|
df_result = df.copy() |
|
|
|
|
|
cg_id_mapping = { |
|
'bitcoin': 'bitcoin', |
|
'btc': 'bitcoin', |
|
'ethereum': 'ethereum', |
|
'eth': 'ethereum', |
|
'binancecoin': 'binancecoin', |
|
'bnb': 'binancecoin', |
|
'cardano': 'cardano', |
|
'ada': 'cardano', |
|
'solana': 'solana', |
|
'sol': 'solana', |
|
'xrp': 'ripple', |
|
'ripple': 'ripple', |
|
'dogecoin': 'dogecoin', |
|
'doge': 'dogecoin', |
|
'polkadot': 'polkadot', |
|
'dot': 'polkadot', |
|
'avalanche-2': 'avalanche-2', |
|
'avax': 'avalanche-2', |
|
'chainlink': 'chainlink', |
|
'link': 'chainlink', |
|
'polygon': 'matic-network', |
|
'matic': 'matic-network', |
|
'litecoin': 'litecoin', |
|
'ltc': 'litecoin', |
|
'uniswap': 'uniswap', |
|
'uni': 'uniswap' |
|
} |
|
|
|
for symbol in df['symbol'].unique(): |
|
symbol_mask = df['symbol'] == symbol |
|
|
|
if df.loc[symbol_mask, 'cg_id'].isna().all(): |
|
cg_id = cg_id_mapping.get(symbol.lower(), symbol.lower()) |
|
df_result.loc[symbol_mask, 'cg_id'] = cg_id |
|
|
|
return df_result |
|
|
|
def process_crypto_features(self, df): |
|
"""Process crypto features with advanced symbol-first null handling""" |
|
print("Processing crypto features with symbol-first strategy...") |
|
df_result = df.copy() |
|
|
|
|
|
print("Analyzing symbol statistics...") |
|
self.symbol_stats = self._analyze_symbol_statistics(df_result) |
|
print(f"Analyzed {len(self.symbol_stats)} symbols") |
|
|
|
|
|
self.symbol_profiles = self._create_symbol_profiles(df_result) |
|
|
|
|
|
priority_columns = [ |
|
'price', 'volume', 'marketcap', 'dominance', 'rank', |
|
'performance.day', 'performance.week', 'performance.month', |
|
'rsi', 'macd', 'transaction_count', 'transaction_volume' |
|
] |
|
|
|
for column in priority_columns: |
|
if column in df_result.columns and df_result[column].isna().any(): |
|
print(f"Processing {column} with symbol-first strategy...") |
|
|
|
for symbol in df_result['symbol'].unique(): |
|
symbol_mask = df_result['symbol'] == symbol |
|
null_mask = df_result[column].isna() |
|
fill_mask = symbol_mask & null_mask |
|
|
|
if fill_mask.any(): |
|
|
|
fill_result = self._intelligent_symbol_fill(df_result, symbol, column) |
|
|
|
if fill_result is not None: |
|
if isinstance(fill_result, pd.Series): |
|
|
|
|
|
symbol_indices = df_result[symbol_mask].index |
|
if len(fill_result) == len(symbol_indices): |
|
|
|
for i, idx in enumerate(symbol_indices): |
|
if pd.notna(fill_result.iloc[i]): |
|
df_result.loc[idx, column] = fill_result.iloc[i] |
|
else: |
|
|
|
fill_value = fill_result.median() |
|
if pd.notna(fill_value): |
|
df_result.loc[fill_mask, column] = fill_value |
|
else: |
|
|
|
df_result.loc[fill_mask, column] = fill_result |
|
|
|
|
|
df_result = self._fill_exchange_prices_advanced(df_result) |
|
|
|
|
|
df_result = self._fill_exchange_symbols(df_result) |
|
|
|
|
|
df_result = self._fill_cg_id(df_result) |
|
|
|
|
|
for column in df_result.columns: |
|
if df_result[column].isna().any(): |
|
default_value = self.crypto_column_defaults.get(column) |
|
|
|
if default_value is not None: |
|
for symbol in df_result['symbol'].unique(): |
|
symbol_mask = df_result['symbol'] == symbol |
|
null_mask = df_result[column].isna() |
|
fill_mask = symbol_mask & null_mask |
|
|
|
if fill_mask.any(): |
|
try: |
|
fill_value = self._intelligent_fill_value( |
|
df_result, symbol, column, default_value |
|
) |
|
df_result.loc[fill_mask, column] = fill_value |
|
except Exception as e: |
|
print(f"Warning: Failed to fill {column} for {symbol}: {e}") |
|
|
|
continue |
|
|
|
return df_result |
|
|
|
def process_stock_features(self, df): |
|
"""Process stock features with advanced symbol-first null handling""" |
|
print("Processing stock features with symbol-first strategy...") |
|
df_result = df.copy() |
|
|
|
|
|
print("Analyzing symbol statistics...") |
|
self.symbol_stats = self._analyze_symbol_statistics(df_result) |
|
print(f"Analyzed {len(self.symbol_stats)} symbols") |
|
|
|
|
|
self.symbol_profiles = self._create_symbol_profiles(df_result) |
|
|
|
|
|
priority_columns = [ |
|
'close', 'open', 'high', 'low', 'volume', 'prev_close', |
|
'marketCapitalization', 'shareOutstanding', |
|
'rsi', 'macd', 'atr', 'bb_position', |
|
'news_sentiment_mean_x', 'news_sentiment_mean_y', |
|
'buy', 'sell', 'hold', 'strongBuy', 'strongSell' |
|
] |
|
|
|
for column in priority_columns: |
|
if column in df_result.columns and df_result[column].isna().any(): |
|
print(f"Processing {column} with symbol-first strategy...") |
|
|
|
for symbol in df_result['symbol'].unique(): |
|
symbol_mask = df_result['symbol'] == symbol |
|
null_mask = df_result[column].isna() |
|
fill_mask = symbol_mask & null_mask |
|
|
|
if fill_mask.any(): |
|
|
|
fill_result = self._intelligent_symbol_fill(df_result, symbol, column) |
|
|
|
if fill_result is not None: |
|
if isinstance(fill_result, pd.Series): |
|
|
|
|
|
symbol_indices = df_result[symbol_mask].index |
|
if len(fill_result) == len(symbol_indices): |
|
|
|
for i, idx in enumerate(symbol_indices): |
|
if pd.notna(fill_result.iloc[i]): |
|
df_result.loc[idx, column] = fill_result.iloc[i] |
|
else: |
|
|
|
fill_value = fill_result.median() |
|
if pd.notna(fill_value): |
|
df_result.loc[fill_mask, column] = fill_value |
|
else: |
|
|
|
df_result.loc[fill_mask, column] = fill_result |
|
|
|
|
|
for column in df_result.columns: |
|
if df_result[column].isna().any(): |
|
default_value = self.stock_column_defaults.get(column) |
|
|
|
if default_value is not None: |
|
for symbol in df_result['symbol'].unique(): |
|
symbol_mask = df_result['symbol'] == symbol |
|
null_mask = df_result[column].isna() |
|
fill_mask = symbol_mask & null_mask |
|
|
|
if fill_mask.any(): |
|
try: |
|
fill_value = self._intelligent_fill_value( |
|
df_result, symbol, column, default_value |
|
) |
|
df_result.loc[fill_mask, column] = fill_value |
|
except Exception as e: |
|
print(f"Warning: Failed to fill {column} for {symbol}: {e}") |
|
|
|
continue |
|
|
|
return df_result |
|
|
|
def generate_report(self, df_before, df_after, feature_type): |
|
"""Generate a comprehensive report of null value handling with symbol-first strategy details""" |
|
before_nulls = df_before.isnull().sum() |
|
after_nulls = df_after.isnull().sum() |
|
|
|
null_reduction = before_nulls - after_nulls |
|
columns_fixed = null_reduction[null_reduction > 0] |
|
|
|
|
|
symbol_analysis = {} |
|
if 'symbol' in df_before.columns: |
|
for symbol in df_before['symbol'].unique(): |
|
symbol_before = int(df_before[df_before['symbol'] == symbol].isnull().sum().sum()) |
|
symbol_after = int(df_after[df_after['symbol'] == symbol].isnull().sum().sum()) |
|
symbol_analysis[symbol] = { |
|
'nulls_before': symbol_before, |
|
'nulls_after': symbol_after, |
|
'nulls_filled': symbol_before - symbol_after, |
|
'records': int(len(df_before[df_before['symbol'] == symbol])) |
|
} |
|
|
|
|
|
temporal_analysis = {} |
|
if 'interval_timestamp' in df_before.columns: |
|
df_before_ts = df_before.copy() |
|
df_after_ts = df_after.copy() |
|
df_before_ts['date'] = pd.to_datetime(df_before_ts['interval_timestamp'], unit='ms').dt.date |
|
df_after_ts['date'] = pd.to_datetime(df_after_ts['interval_timestamp'], unit='ms').dt.date |
|
|
|
for date in df_before_ts['date'].unique(): |
|
date_before = int(df_before_ts[df_before_ts['date'] == date].isnull().sum().sum()) |
|
date_after = int(df_after_ts[df_after_ts['date'] == date].isnull().sum().sum()) |
|
temporal_analysis[str(date)] = { |
|
'nulls_before': date_before, |
|
'nulls_after': date_after, |
|
'nulls_filled': date_before - date_after |
|
} |
|
|
|
report = { |
|
'feature_type': feature_type, |
|
'timestamp': pd.Timestamp.now().isoformat(), |
|
'strategy': 'symbol-first-temporal-interpolation', |
|
'total_rows': int(len(df_after)), |
|
'total_columns': int(len(df_after.columns)), |
|
'unique_symbols': int(len(df_after['symbol'].unique())) if 'symbol' in df_after.columns else 0, |
|
'columns_with_nulls_before': int((before_nulls > 0).sum()), |
|
'columns_with_nulls_after': int((after_nulls > 0).sum()), |
|
'total_nulls_before': int(before_nulls.sum()), |
|
'total_nulls_after': int(after_nulls.sum()), |
|
'total_nulls_filled': int(null_reduction.sum()), |
|
'columns_fixed': int(len(columns_fixed)), |
|
'null_reduction_rate': float((null_reduction.sum() / before_nulls.sum()) if before_nulls.sum() > 0 else 0), |
|
'remaining_null_columns': {str(k): int(v) for k, v in after_nulls[after_nulls > 0].to_dict().items()}, |
|
'fixed_columns_detail': {str(k): int(v) for k, v in null_reduction[null_reduction > 0].to_dict().items()}, |
|
'symbol_analysis': symbol_analysis, |
|
'temporal_analysis': temporal_analysis, |
|
'strategy_details': { |
|
'symbol_stats_analyzed': len(self.symbol_stats), |
|
'temporal_interpolation_used': True, |
|
'similar_symbol_fallback': True, |
|
'intelligent_defaults': True |
|
} |
|
} |
|
|
|
return report |
|
|
|
|
|
def process_crypto_features_file(input_path, output_path=None): |
|
"""Process crypto features file""" |
|
if output_path is None: |
|
output_path = input_path |
|
|
|
print(f"Loading crypto features from {input_path}...") |
|
df = pd.read_parquet(input_path) |
|
|
|
print(f"Loaded {len(df)} rows with {len(df.columns)} columns") |
|
print(f"Null values before processing: {df.isnull().sum().sum()}") |
|
|
|
handler = FinalNullValueHandler() |
|
df_processed = handler.process_crypto_features(df) |
|
|
|
print(f"Null values after processing: {df_processed.isnull().sum().sum()}") |
|
|
|
|
|
report = handler.generate_report(df, df_processed, 'crypto') |
|
|
|
|
|
df_processed.to_parquet(output_path, index=False) |
|
print(f"Saved processed crypto features to {output_path}") |
|
|
|
|
|
report_path = str(output_path).replace('.parquet', '_null_handling_report.json') |
|
with open(report_path, 'w') as f: |
|
json.dump(report, f, indent=2) |
|
print(f"Saved processing report to {report_path}") |
|
|
|
return df_processed, report |
|
|
|
|
|
def process_stock_features_file(input_path, output_path=None): |
|
"""Process stock features file""" |
|
if output_path is None: |
|
output_path = input_path |
|
|
|
print(f"Loading stock features from {input_path}...") |
|
df = pd.read_parquet(input_path) |
|
|
|
print(f"Loaded {len(df)} rows with {len(df.columns)} columns") |
|
print(f"Null values before processing: {df.isnull().sum().sum()}") |
|
|
|
handler = FinalNullValueHandler() |
|
df_processed = handler.process_stock_features(df) |
|
|
|
print(f"Null values after processing: {df_processed.isnull().sum().sum()}") |
|
|
|
|
|
report = handler.generate_report(df, df_processed, 'stock') |
|
|
|
|
|
df_processed.to_parquet(output_path, index=False) |
|
print(f"Saved processed stock features to {output_path}") |
|
|
|
|
|
report_path = str(output_path).replace('.parquet', '_null_handling_report.json') |
|
with open(report_path, 'w') as f: |
|
json.dump(report, f, indent=2) |
|
print(f"Saved processing report to {report_path}") |
|
|
|
return df_processed, report |
|
|
|
|
|
def main(): |
|
"""Main function to process both crypto and stock features""" |
|
crypto_path = Path("data/merged/features/crypto_features.parquet") |
|
stocks_path = Path("data/merged/features/stocks_features.parquet") |
|
|
|
processed_files = [] |
|
|
|
|
|
if crypto_path.exists(): |
|
try: |
|
df_crypto, report_crypto = process_crypto_features_file(crypto_path) |
|
processed_files.append(('crypto', crypto_path, report_crypto)) |
|
print(f"β Crypto features processed: {report_crypto['total_nulls_filled']} nulls filled") |
|
except Exception as e: |
|
print(f"β Error processing crypto features: {e}") |
|
else: |
|
print(f"Warning: {crypto_path} not found") |
|
|
|
|
|
if stocks_path.exists(): |
|
try: |
|
df_stocks, report_stocks = process_stock_features_file(stocks_path) |
|
processed_files.append(('stocks', stocks_path, report_stocks)) |
|
print(f"β Stock features processed: {report_stocks['total_nulls_filled']} nulls filled") |
|
except Exception as e: |
|
print(f"β Error processing stock features: {e}") |
|
else: |
|
print(f"Warning: {stocks_path} not found") |
|
|
|
|
|
if processed_files: |
|
print("\n" + "="*60) |
|
print("FINAL NULL VALUE HANDLING SUMMARY") |
|
print("="*60) |
|
|
|
total_nulls_filled = 0 |
|
for file_type, file_path, report in processed_files: |
|
total_nulls_filled += report['total_nulls_filled'] |
|
print(f"\n{file_type.upper()} FEATURES:") |
|
print(f" File: {file_path}") |
|
print(f" Rows: {report['total_rows']:,}") |
|
print(f" Columns: {report['total_columns']}") |
|
print(f" Nulls filled: {report['total_nulls_filled']:,}") |
|
print(f" Columns fixed: {report['columns_fixed']}") |
|
print(f" Remaining null columns: {len(report['remaining_null_columns'])}") |
|
|
|
if report['remaining_null_columns']: |
|
print(f" Still have nulls: {list(report['remaining_null_columns'].keys())}") |
|
|
|
print(f"\nTOTAL NULLS FILLED ACROSS ALL FILES: {total_nulls_filled:,}") |
|
print("="*60) |
|
else: |
|
print("No files were processed successfully.") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|