|
|
|
""" |
|
Final Null Handler Integration Script |
|
Integrates the final null value handler into the existing merge pipeline. |
|
""" |
|
|
|
import sys |
|
import subprocess |
|
from pathlib import Path |
|
import numpy as np |
|
import pandas as pd |
|
from final_null_handler import FinalNullValueHandler, process_crypto_features_file, process_stock_features_file |
|
|
|
def run_final_null_handling(): |
|
"""Run the final null value handling on all feature files""" |
|
|
|
print("="*60) |
|
print("STARTING FINAL NULL VALUE HANDLING") |
|
print("="*60) |
|
|
|
base_path = Path("data/merged/features") |
|
|
|
files_to_process = [ |
|
("crypto_features.parquet", "crypto"), |
|
("stocks_features.parquet", "stock"), |
|
("merged_features.parquet", "merged") |
|
] |
|
|
|
results = {} |
|
|
|
for filename, file_type in files_to_process: |
|
file_path = base_path / filename |
|
|
|
if not file_path.exists(): |
|
print(f"[WARNING] {filename} not found, skipping...") |
|
continue |
|
|
|
print(f"\n[INFO] Processing {filename}...") |
|
|
|
try: |
|
if file_type == "crypto": |
|
df_processed, report = process_crypto_features_file(file_path) |
|
elif file_type == "stock": |
|
df_processed, report = process_stock_features_file(file_path) |
|
elif file_type == "merged": |
|
|
|
df_processed, report = process_merged_features_file(file_path) |
|
|
|
results[file_type] = { |
|
'success': True, |
|
'file_path': file_path, |
|
'report': report, |
|
'rows': len(df_processed), |
|
'nulls_filled': report['total_nulls_filled'] |
|
} |
|
|
|
print(f"[SUCCESS] {filename} processed successfully!") |
|
print(f" - Rows: {len(df_processed):,}") |
|
print(f" - Nulls filled: {report['total_nulls_filled']:,}") |
|
|
|
except Exception as e: |
|
print(f"[ERROR] Error processing {filename}: {str(e)}") |
|
results[file_type] = { |
|
'success': False, |
|
'error': str(e), |
|
'file_path': file_path |
|
} |
|
|
|
return results |
|
|
|
def process_merged_features_file(file_path): |
|
"""Process merged features file (contains both crypto and stock data)""" |
|
print(f"Loading merged features from {file_path}...") |
|
df = pd.read_parquet(file_path) |
|
|
|
print(f"Loaded {len(df)} rows with {len(df.columns)} columns") |
|
print(f"Null values before processing: {df.isnull().sum().sum()}") |
|
|
|
handler = FinalNullValueHandler() |
|
|
|
|
|
if 'symbol' in df.columns: |
|
|
|
crypto_indicators = ['rank', 'dominance', 'performance.day', 'exchangePrices.binance'] |
|
stock_indicators = ['news_activity_score_x', 'strongBuy', 'marketCapitalization'] |
|
|
|
has_crypto_cols = any(col in df.columns for col in crypto_indicators) |
|
has_stock_cols = any(col in df.columns for col in stock_indicators) |
|
|
|
if has_crypto_cols and has_stock_cols: |
|
|
|
print("Detected mixed crypto/stock data - processing intelligently...") |
|
|
|
|
|
crypto_mask = df['rank'].notna() | df['dominance'].notna() |
|
if crypto_mask.any(): |
|
print(f"Processing {crypto_mask.sum()} rows as crypto data...") |
|
df_crypto = df[crypto_mask].copy() |
|
df_crypto_processed = handler.process_crypto_features(df_crypto) |
|
df.loc[crypto_mask] = df_crypto_processed |
|
|
|
stock_mask = ~crypto_mask |
|
if stock_mask.any(): |
|
print(f"Processing {stock_mask.sum()} rows as stock data...") |
|
df_stock = df[stock_mask].copy() |
|
df_stock_processed = handler.process_stock_features(df_stock) |
|
df.loc[stock_mask] = df_stock_processed |
|
|
|
df_processed = df |
|
|
|
elif has_crypto_cols: |
|
print("Detected crypto-only data...") |
|
df_processed = handler.process_crypto_features(df) |
|
elif has_stock_cols: |
|
print("Detected stock-only data...") |
|
df_processed = handler.process_stock_features(df) |
|
else: |
|
print("Could not determine data type, applying generic processing...") |
|
df_processed = handler.process_stock_features(df) |
|
else: |
|
print("No symbol column found, applying generic processing...") |
|
df_processed = handler.process_stock_features(df) |
|
|
|
print(f"Null values after processing: {df_processed.isnull().sum().sum()}") |
|
|
|
|
|
report = handler.generate_report(df, df_processed, 'merged') |
|
|
|
|
|
df_processed.to_parquet(file_path, index=False) |
|
print(f"Saved processed merged features to {file_path}") |
|
|
|
return df_processed, report |
|
|
|
def validate_data_quality(results): |
|
"""Validate that the data quality is maintained after null handling""" |
|
print("\n" + "="*60) |
|
print("DATA QUALITY VALIDATION") |
|
print("="*60) |
|
|
|
validation_results = {} |
|
|
|
for file_type, result in results.items(): |
|
if not result.get('success', False): |
|
continue |
|
|
|
file_path = result['file_path'] |
|
|
|
try: |
|
df = pd.read_parquet(file_path) |
|
|
|
|
|
validation = { |
|
'total_rows': len(df), |
|
'total_columns': len(df.columns), |
|
'remaining_nulls': df.isnull().sum().sum(), |
|
'duplicate_rows': df.duplicated().sum(), |
|
'infinite_values': np.isinf(df.select_dtypes(include=[np.number])).sum().sum(), |
|
'data_types_consistent': True, |
|
} |
|
|
|
|
|
numeric_cols = df.select_dtypes(include=[np.number]).columns |
|
extreme_values = {} |
|
|
|
for col in numeric_cols: |
|
if col in df.columns: |
|
col_data = df[col].dropna() |
|
if len(col_data) > 0: |
|
q1, q99 = col_data.quantile([0.01, 0.99]) |
|
extreme_count = ((col_data < q1 - 10 * (q99 - q1)) | |
|
(col_data > q99 + 10 * (q99 - q1))).sum() |
|
if extreme_count > 0: |
|
extreme_values[col] = extreme_count |
|
|
|
validation['extreme_values'] = extreme_values |
|
validation['quality_score'] = calculate_quality_score(validation) |
|
|
|
validation_results[file_type] = validation |
|
|
|
print(f"\n{file_type.upper()} VALIDATION:") |
|
print(f" β Rows: {validation['total_rows']:,}") |
|
print(f" β Columns: {validation['total_columns']}") |
|
print(f" β Remaining nulls: {validation['remaining_nulls']}") |
|
print(f" β Duplicate rows: {validation['duplicate_rows']}") |
|
print(f" β Infinite values: {validation['infinite_values']}") |
|
print(f" β Quality score: {validation['quality_score']:.2%}") |
|
|
|
if extreme_values: |
|
print(f" [WARNING] Extreme values detected in {len(extreme_values)} columns") |
|
|
|
except Exception as e: |
|
print(f"[ERROR] Validation failed for {file_type}: {str(e)}") |
|
validation_results[file_type] = {'error': str(e)} |
|
|
|
return validation_results |
|
|
|
def calculate_quality_score(validation): |
|
"""Calculate a simple quality score""" |
|
score = 1.0 |
|
|
|
|
|
if validation['total_rows'] > 0: |
|
null_ratio = validation['remaining_nulls'] / (validation['total_rows'] * validation['total_columns']) |
|
score -= null_ratio * 0.5 |
|
|
|
|
|
if validation['total_rows'] > 0: |
|
dup_ratio = validation['duplicate_rows'] / validation['total_rows'] |
|
score -= dup_ratio * 0.3 |
|
|
|
|
|
if validation['infinite_values'] > 0: |
|
score -= 0.1 |
|
|
|
|
|
extreme_columns = len(validation.get('extreme_values', {})) |
|
if extreme_columns > 0: |
|
score -= (extreme_columns / validation['total_columns']) * 0.2 |
|
|
|
return max(0.0, score) |
|
|
|
def print_final_summary(results, validation_results): |
|
"""Print final summary of the null handling process""" |
|
print("\n" + "="*60) |
|
print("FINAL NULL HANDLING SUMMARY") |
|
print("="*60) |
|
|
|
total_nulls_filled = sum(r.get('nulls_filled', 0) for r in results.values() if r.get('success')) |
|
successful_files = sum(1 for r in results.values() if r.get('success')) |
|
total_files = len(results) |
|
|
|
print(f"\n[INFO] PROCESSING RESULTS:") |
|
print(f" Files processed: {successful_files}/{total_files}") |
|
print(f" Total nulls filled: {total_nulls_filled:,}") |
|
|
|
print(f"\n[METRICS] QUALITY METRICS:") |
|
for file_type, validation in validation_results.items(): |
|
if 'error' not in validation: |
|
print(f" {file_type}: {validation['quality_score']:.1%} quality score") |
|
|
|
if successful_files == total_files: |
|
print(f"\n[SUCCESS] ALL FILES PROCESSED SUCCESSFULLY!") |
|
else: |
|
failed_files = total_files - successful_files |
|
print(f"\n[WARNING] {failed_files} files failed to process") |
|
|
|
print("\n[TIPS] RECOMMENDATIONS:") |
|
print(" - Review any remaining null columns in the reports") |
|
print(" - Monitor data quality scores in production") |
|
print(" - Consider additional validation rules if needed") |
|
|
|
print("\n" + "="*60) |
|
|
|
def main(): |
|
"""Main function""" |
|
try: |
|
|
|
import numpy as np |
|
globals()['np'] = np |
|
|
|
|
|
results = run_final_null_handling() |
|
|
|
|
|
validation_results = validate_data_quality(results) |
|
|
|
|
|
print_final_summary(results, validation_results) |
|
|
|
|
|
success_count = sum(1 for r in results.values() if r.get('success')) |
|
return 0 if success_count == len(results) else 1 |
|
|
|
except Exception as e: |
|
print(f"[ERROR] Fatal error in null handling process: {str(e)}") |
|
return 1 |
|
|
|
if __name__ == "__main__": |
|
exit_code = main() |
|
sys.exit(exit_code) |
|
|