advisorai-data-enhanced / src /merge /stocks_data_filler.py
Maaroufabousaleh
f
c49b21b
raw
history blame
20.6 kB
import pandas as pd
import numpy as np
from sklearn.impute import KNNImputer
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings('ignore')
class ImprovedStockDataImputer:
"""
Enhanced imputation that prevents data homogenization by using
symbol-specific patterns and relationships.
"""
def __init__(self, preserve_symbol_diversity=True):
self.preserve_symbol_diversity = preserve_symbol_diversity
self.symbol_profiles = {}
self.scalers = {}
def _create_symbol_profiles(self, df):
"""Create profiles for each symbol to guide imputation."""
profiles = {}
for symbol in df['symbol'].unique():
symbol_data = df[df['symbol'] == symbol]
# Calculate symbol-specific statistics with proper null handling
price_col = None
for col in ['price', 'close', 'close_alpaca', 'open', 'high', 'low']:
if col in symbol_data.columns and not symbol_data[col].isnull().all():
price_col = col
break
volume_col = None
for col in ['volume', 'volume_alpaca']:
if col in symbol_data.columns and not symbol_data[col].isnull().all():
volume_col = col
break
profile = {
'symbol': symbol,
'price_level': symbol_data[price_col].median() if price_col else 100.0, # Default to 100
'price_volatility': symbol_data[price_col].std() if price_col else 2.0, # Default volatility
'volume_level': symbol_data[volume_col].median() if volume_col else 1000.0, # Default volume
'is_crypto': symbol_data['is_crypto'].mode().iloc[0] if 'is_crypto' in symbol_data.columns and not symbol_data['is_crypto'].isnull().all() else 0,
'typical_rsi': symbol_data['rsi'].median() if 'rsi' in symbol_data.columns and not symbol_data['rsi'].isnull().all() else 50.0,
'data_availability': len(symbol_data) / len(df) if len(df) > 0 else 0
}
# Ensure no None values in profile
for key, value in profile.items():
if value is None or (isinstance(value, float) and np.isnan(value)):
if key == 'price_level':
profile[key] = 100.0
elif key == 'price_volatility':
profile[key] = 2.0
elif key == 'volume_level':
profile[key] = 1000.0
elif key == 'typical_rsi':
profile[key] = 50.0
elif key == 'is_crypto':
profile[key] = 0
else:
profile[key] = 0.0
profiles[symbol] = profile
return profiles
def _impute_with_symbol_context(self, df, column, symbol_profiles):
"""Impute values using symbol-specific context to prevent homogenization."""
df_result = df.copy()
for symbol in df['symbol'].unique():
symbol_mask = df['symbol'] == symbol
symbol_data = df.loc[symbol_mask, column]
if symbol_data.isnull().sum() == 0:
continue # No missing values for this symbol
profile = symbol_profiles.get(symbol, {})
# Strategy depends on column type and symbol characteristics
if column in ['price', 'open', 'high', 'low', 'close']:
# Price data - use interpolation with symbol-specific bounds
interpolated = symbol_data.interpolate(method='linear', limit_direction='both')
# If still missing, use symbol's typical price level with noise
if interpolated.isnull().any():
base_price = profile.get('price_level', 100.0)
volatility = profile.get('price_volatility', base_price * 0.02)
# Add symbol-specific noise to prevent identical values
symbol_hash = hash(symbol) % 1000 / 1000 # 0-1 range
noise_factor = (symbol_hash - 0.5) * 0.1 # -5% to +5%
adjusted_price = base_price * (1 + noise_factor)
interpolated = interpolated.fillna(adjusted_price)
df_result.loc[symbol_mask, column] = interpolated
elif column in ['volume', 'volume_alpaca']:
# Volume data - use forward fill then symbol-specific median
filled = symbol_data.fillna(method='ffill').fillna(method='bfill')
if filled.isnull().any():
# Use symbol's typical volume with variation
base_volume = profile.get('volume_level', 1000.0)
symbol_hash = hash(symbol + column) % 1000 / 1000
volume_multiplier = 0.5 + symbol_hash # 0.5x to 1.5x variation
adjusted_volume = base_volume * volume_multiplier
filled = filled.fillna(adjusted_volume)
df_result.loc[symbol_mask, column] = filled
elif column in ['rsi', 'stoch_k', 'stoch_d']:
# Oscillator indicators - use symbol-specific typical values
symbol_median = symbol_data.median()
if pd.isna(symbol_median):
# Use symbol-specific baseline with variation
symbol_hash = hash(symbol + column) % 1000 / 1000
if column == 'rsi':
# RSI: 30-70 range with symbol variation
baseline = 30 + (symbol_hash * 40) # 30-70 range
else: # stochastic
baseline = 20 + (symbol_hash * 60) # 20-80 range
else:
baseline = symbol_median
df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline)
elif column in ['macd', 'macd_signal', 'macd_histogram']:
# MACD - can be positive/negative, use symbol-specific pattern
symbol_median = symbol_data.median()
if pd.isna(symbol_median):
# Use price-level dependent MACD estimation with null safety
price_level = profile.get('price_level', 100.0) # Default to 100 if None
if price_level is None or np.isnan(price_level):
price_level = 100.0
symbol_hash = hash(symbol + column) % 2000 / 1000 - 1 # -1 to +1
# Scale MACD relative to price level
baseline = (price_level * 0.001) * symbol_hash
else:
baseline = symbol_median
df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline)
else:
# Generic numeric imputation with symbol variation
symbol_median = symbol_data.median()
if pd.isna(symbol_median):
# Use overall median but add symbol-specific variation
overall_median = df[column].median()
if pd.isna(overall_median):
overall_median = 0
# Add symbol-specific variation (±10%)
symbol_hash = hash(symbol + column) % 2000 / 1000 - 1 # -1 to +1
variation = overall_median * 0.1 * symbol_hash
baseline = overall_median + variation
else:
baseline = symbol_median
df_result.loc[symbol_mask, column] = symbol_data.fillna(baseline)
return df_result[column]
def fit_transform(self, df):
"""Apply improved imputation with anti-homogenization measures."""
df_imputed = df.copy()
df_imputed = df_imputed.sort_values(['symbol', 'interval_timestamp'])
# Create symbol profiles
self.symbol_profiles = self._create_symbol_profiles(df_imputed)
print(f"Created profiles for {len(self.symbol_profiles)} unique symbols")
# 1. Handle categorical/flag columns (same as before)
categorical_cols = [
'symbol', 'stock_market', 'is_crypto', 'is_stock', 'is_other',
'alpaca_data_available', 'is_trading_hours', 'is_weekend'
]
for col in categorical_cols:
if col in df_imputed.columns:
df_imputed[col] = df_imputed.groupby('symbol')[col].fillna(method='ffill').fillna(method='bfill')
# 2. Price and volume data - symbol-specific imputation
price_volume_cols = [
'price', 'open', 'high', 'low', 'close', 'volume',
'open_alpaca', 'high_alpaca', 'low_alpaca', 'close_alpaca', 'volume_alpaca',
'bid_price', 'ask_price', 'bid_price_alpaca', 'ask_price_alpaca', 'price_alpaca'
]
for col in price_volume_cols:
if col in df_imputed.columns and df_imputed[col].isnull().any():
print(f"Imputing {col} with symbol-specific context...")
df_imputed[col] = self._impute_with_symbol_context(
df_imputed, col, self.symbol_profiles
)
# 3. Technical indicators - symbol-specific imputation
tech_indicators = [
'rsi', 'macd', 'macd_signal', 'macd_histogram', 'atr', 'bb_position',
'stoch_k', 'stoch_d', 'cci', 'roc_5', 'roc_10', 'mfi', 'rsi_macd_signal',
'ema_convergence', 'true_range_pct'
]
for col in tech_indicators:
if col in df_imputed.columns and df_imputed[col].isnull().any():
print(f"Imputing {col} with symbol-specific context...")
df_imputed[col] = self._impute_with_symbol_context(
df_imputed, col, self.symbol_profiles
)
# 4. Volume/price change features - symbol-specific
change_features = [
'price_change_1', 'price_change_7', 'price_change_14', 'volume_ratio',
'volatility_7', 'price_volume_trend', 'volatility_consistency'
]
for col in change_features:
if col in df_imputed.columns and df_imputed[col].isnull().any():
df_imputed[col] = self._impute_with_symbol_context(
df_imputed, col, self.symbol_profiles
)
# 5. On-chain features (crypto only)
onchain_features = [
'total_fees', 'total_gas_used', 'avg_gas_price', 'tx_count_7d_change',
'tx_count_sma_7', 'tx_volume_7d_change', 'tx_volume_sma_7',
'gas_used_7d_change', 'gas_used_sma_7', 'gas_price_7d_change',
'gas_price_sma_7', 'fees_7d_change', 'avg_tx_size'
]
for col in onchain_features:
if col in df_imputed.columns and df_imputed[col].isnull().any():
# Only impute for crypto assets
crypto_mask = df_imputed['is_crypto'] == 1
non_crypto_mask = df_imputed['is_crypto'] != 1
if crypto_mask.any():
crypto_data = df_imputed.loc[crypto_mask]
crypto_imputed = self._impute_with_symbol_context(
crypto_data, col, self.symbol_profiles
)
df_imputed.loc[crypto_mask, col] = crypto_imputed
# Fill non-crypto with 0
df_imputed.loc[non_crypto_mask, col] = df_imputed.loc[non_crypto_mask, col].fillna(0)
# 6. Handle remaining columns with simple strategies
remaining_strategies = {
'quality_metrics': [
'data_quality_score', 'core_features_completeness', 'technical_indicators_completeness',
'onchain_features_completeness', 'price_data_completeness',
'overall_feature_completeness', 'data_completeness_score'
],
'news_sentiment': [
'news_sentiment_mean', 'news_sentiment_std', 'news_sentiment_min',
'news_sentiment_max', 'news_sentiment_range', 'news_match_score_mean',
'news_match_score_max', 'news_mentions_count', 'news_articles_count',
'news_highlights_count', 'news_activity_score', 'sentiment_score'
],
'zero_fill': [
'trade_count', 'trade_count_alpaca', 'bid_size', 'ask_size',
'bid_size_alpaca', 'ask_size_alpaca', 'size', 'size_alpaca'
]
}
# Quality metrics - use median but add small variation
for col in remaining_strategies['quality_metrics']:
if col in df_imputed.columns and df_imputed[col].isnull().any():
median_val = df_imputed[col].median()
if pd.isna(median_val):
median_val = 0.5 # Default for quality metrics
median_val = np.clip(median_val, 0, 1)
# Add tiny symbol-specific variation
for symbol in df_imputed['symbol'].unique():
mask = df_imputed['symbol'] == symbol
symbol_hash = hash(symbol + col) % 100 / 10000 # Very small variation
fill_val = np.clip(median_val + symbol_hash, 0, 1)
df_imputed.loc[mask, col] = df_imputed.loc[mask, col].fillna(fill_val)
# News sentiment - neutral with symbol variation
for col in remaining_strategies['news_sentiment']:
if col in df_imputed.columns and df_imputed[col].isnull().any():
if 'sentiment' in col.lower():
# Slight variation around neutral
for symbol in df_imputed['symbol'].unique():
mask = df_imputed['symbol'] == symbol
symbol_hash = (hash(symbol + col) % 200 / 1000) - 0.1 # -0.1 to +0.1
df_imputed.loc[mask, col] = df_imputed.loc[mask, col].fillna(symbol_hash)
elif 'count' in col.lower():
df_imputed[col] = df_imputed[col].fillna(0)
else:
median_val = df_imputed[col].median()
if pd.isna(median_val):
median_val = 0
df_imputed[col] = df_imputed[col].fillna(median_val)
# Zero fill
for col in remaining_strategies['zero_fill']:
if col in df_imputed.columns:
df_imputed[col] = df_imputed[col].fillna(0)
# Handle any remaining columns
remaining_numeric = df_imputed.select_dtypes(include=[np.number]).columns
remaining_with_nulls = [col for col in remaining_numeric if df_imputed[col].isnull().any()]
for col in remaining_with_nulls:
if col not in ['id', 'id_alpaca', 'backup_id']:
print(f"Imputing remaining column: {col}")
df_imputed[col] = self._impute_with_symbol_context(
df_imputed, col, self.symbol_profiles
)
print("[INFO] Imputation complete with anti-homogenization measures")
print(f"[INFO] Final null counts: {df_imputed.isnull().sum().sum()}")
return df_imputed
# Usage function with validation
def impute_with_validation(file_path, output_path=None):
"""Impute data and validate no homogenization occurred."""
try:
print(f"[INFO] Loading data from: {file_path}")
df = pd.read_parquet(file_path)
print(f"[INFO] Loaded data shape: {df.shape}")
print(f"[INFO] Initial null counts: {df.isnull().sum().sum()}")
except Exception as e:
print(f"[ERROR] Failed to load data: {e}")
return None
# Sample symbols for validation
symbols_sample = df['symbol'].unique()[:5]
print(f"[INFO] Processing {len(df['symbol'].unique())} unique symbols")
# Initialize and run imputer
imputer = ImprovedStockDataImputer()
df_imputed = imputer.fit_transform(df)
# Combine alpaca data with main data where available
alpaca_combinations = [
('high', 'high_alpaca'),
('low', 'low_alpaca'),
('close', 'close_alpaca'),
('open', 'open_alpaca'),
('volume', 'volume_alpaca')
]
for main_col, alpaca_col in alpaca_combinations:
if main_col in df_imputed.columns and alpaca_col in df_imputed.columns:
df_imputed[main_col] = df_imputed[main_col].combine_first(df_imputed[alpaca_col])
print(f"[INFO] Combined {main_col} with {alpaca_col}")
# Drop unwanted columns before saving
drop_cols = [
'_filename', '_original_format', 'alpaca_data_available',
'ask_exchange', 'ask_exchange_alpaca',
'bid_exchange', 'bid_exchange_alpaca',
'conditions', 'conditions_alpaca', 'conditions_trade', 'conditions_trade_alpaca',
'symbol_quote', 'symbol_quote_alpaca', 'symbol_trade', 'symbol_trade_alpaca',
'tape', 'tape_alpaca', 'tape_trade', 'tape_trade_alpaca',
'id', 'id_alpaca',
'is_new_symbol', 'price', 'timestamp_dt',
'alpaca_merge_timestamp', 'timestamp', 'timestamp_alpaca',
'estimateCurrency', 'exchange', 'exchange_alpaca', 'exchange_company',
'finnhubIndustry', 'headline',
'sentiment_timestamp', 'logo',
'ticker', 'stock_market',
'weburl', 'latest_news_timestamp', 'day_of_week', 'feature_timestamp',
'interval_timestamp_dt', 'is_crypto', 'is_other', 'is_stock',
'country', 'currency', 'datetime', 'ipo', 'name', 'period', 'phone',
'year', 'month', 'latest_news_timestamp_x', 'latest_news_timestamp_y'
]
original_cols = len(df_imputed.columns)
for col in drop_cols:
if col in df_imputed.columns:
df_imputed = df_imputed.drop(columns=col)
print(f"[INFO] Dropped {original_cols - len(df_imputed.columns)} unwanted columns")
# Reorder columns: 'symbol' first, 'interval_timestamp' second, rest follow
cols = list(df_imputed.columns)
if 'symbol' in cols and 'interval_timestamp' in cols:
rest = [c for c in cols if c not in ['symbol', 'interval_timestamp']]
df_imputed = df_imputed[['symbol', 'interval_timestamp'] + rest]
print("[INFO] Reordered columns with symbol and interval_timestamp first")
# Save results
if output_path:
# Clean up data types
if 'backup_id' in df_imputed.columns:
df_imputed['backup_id'] = df_imputed['backup_id'].astype(str)
try:
df_imputed.to_parquet(output_path, compression='snappy')
print(f"[INFO] Successfully saved imputed data to: {output_path}")
except Exception as e:
print(f"[ERROR] Failed to save data: {e}")
return None
print(f"[INFO] Final dataset shape: {df_imputed.shape}")
return df_imputed
# Example usage
def main():
input_file = "data/merged/features/stocks_features.parquet"
output_file = input_file
print("[INFO] Starting stock data imputation process...")
df_clean = impute_with_validation(input_file, output_file)
if df_clean is not None:
print(f"[INFO] Data imputation completed successfully!")
print(f"[INFO] Final shape: {df_clean.shape}")
print(f"[INFO] Remaining nulls: {df_clean.isnull().sum().sum()}")
# Quick validation
print("\n=== VALIDATION SUMMARY ===")
print(f"Unique symbols: {df_clean['symbol'].nunique()}")
if 'close' in df_clean.columns:
print(f"Price range: ${df_clean['close'].min():.2f} - ${df_clean['close'].max():.2f}")
if 'volume' in df_clean.columns:
print(f"Volume range: {df_clean['volume'].min():.0f} - {df_clean['volume'].max():.0f}")
else:
print("[ERROR] Failed to load or impute data.")
if __name__ == "__main__":
main()