|
|
|
""" |
|
Manual Null Handler - Standalone script for manual execution |
|
Use this when you need to handle null values without running the full pipeline |
|
""" |
|
|
|
import argparse |
|
import sys |
|
from pathlib import Path |
|
import pandas as pd |
|
from final_null_handler import process_crypto_features_file, process_stock_features_file, process_merged_features_file |
|
from run_final_null_handling import process_merged_features_file |
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description='Handle null values in feature files') |
|
parser.add_argument('--crypto', action='store_true', help='Process crypto features only') |
|
parser.add_argument('--stocks', action='store_true', help='Process stock features only') |
|
parser.add_argument('--merged', action='store_true', help='Process merged features only') |
|
parser.add_argument('--all', action='store_true', help='Process all feature files') |
|
parser.add_argument('--input', type=str, help='Input file path (overrides default paths)') |
|
parser.add_argument('--output', type=str, help='Output file path (defaults to input path)') |
|
parser.add_argument('--dry-run', action='store_true', help='Show what would be done without making changes') |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
default_paths = { |
|
'crypto': Path("data/merged/features/crypto_features.parquet"), |
|
'stocks': Path("data/merged/features/stocks_features.parquet"), |
|
'merged': Path("data/merged/features/merged_features.parquet") |
|
} |
|
|
|
if not any([args.crypto, args.stocks, args.merged, args.all, args.input]): |
|
print("Error: Must specify --crypto, --stocks, --merged, --all, or --input") |
|
parser.print_help() |
|
return 1 |
|
|
|
files_to_process = [] |
|
|
|
if args.input: |
|
|
|
input_path = Path(args.input) |
|
if not input_path.exists(): |
|
print(f"Error: Input file {input_path} does not exist") |
|
return 1 |
|
|
|
|
|
if 'crypto' in input_path.name.lower(): |
|
file_type = 'crypto' |
|
elif 'stock' in input_path.name.lower(): |
|
file_type = 'stocks' |
|
elif 'merged' in input_path.name.lower(): |
|
file_type = 'merged' |
|
else: |
|
|
|
try: |
|
df_sample = pd.read_parquet(input_path, nrows=10) |
|
if 'rank' in df_sample.columns or 'dominance' in df_sample.columns: |
|
file_type = 'crypto' |
|
elif 'strongBuy' in df_sample.columns or 'news_activity_score_x' in df_sample.columns: |
|
file_type = 'stocks' |
|
else: |
|
file_type = 'merged' |
|
except Exception: |
|
file_type = 'merged' |
|
|
|
output_path = Path(args.output) if args.output else input_path |
|
files_to_process.append((input_path, output_path, file_type)) |
|
|
|
else: |
|
|
|
if args.all: |
|
for file_type, path in default_paths.items(): |
|
if path.exists(): |
|
files_to_process.append((path, path, file_type)) |
|
else: |
|
if args.crypto and default_paths['crypto'].exists(): |
|
files_to_process.append((default_paths['crypto'], default_paths['crypto'], 'crypto')) |
|
if args.stocks and default_paths['stocks'].exists(): |
|
files_to_process.append((default_paths['stocks'], default_paths['stocks'], 'stocks')) |
|
if args.merged and default_paths['merged'].exists(): |
|
files_to_process.append((default_paths['merged'], default_paths['merged'], 'merged')) |
|
|
|
if not files_to_process: |
|
print("Error: No files found to process") |
|
return 1 |
|
|
|
print("="*60) |
|
print("MANUAL NULL VALUE HANDLER") |
|
print("="*60) |
|
|
|
if args.dry_run: |
|
print("DRY RUN MODE - No changes will be made") |
|
print() |
|
|
|
for input_path, output_path, file_type in files_to_process: |
|
print(f"\nProcessing: {input_path}") |
|
print(f"Type: {file_type}") |
|
print(f"Output: {output_path}") |
|
|
|
if args.dry_run: |
|
try: |
|
df = pd.read_parquet(input_path) |
|
null_count = df.isnull().sum().sum() |
|
print(f"Would process {len(df)} rows with {null_count} null values") |
|
except Exception as e: |
|
print(f"Error reading file: {e}") |
|
continue |
|
|
|
try: |
|
if file_type == 'crypto': |
|
df_processed, report = process_crypto_features_file(input_path, output_path) |
|
elif file_type == 'stocks': |
|
df_processed, report = process_stock_features_file(input_path, output_path) |
|
elif file_type == 'merged': |
|
df_processed, report = process_merged_features_file(input_path) |
|
|
|
print(f"β
Successfully processed {file_type} features:") |
|
print(f" - Rows: {len(df_processed):,}") |
|
print(f" - Nulls filled: {report['total_nulls_filled']:,}") |
|
print(f" - Columns fixed: {report['columns_fixed']}") |
|
|
|
except Exception as e: |
|
print(f"β Error processing {input_path}: {e}") |
|
return 1 |
|
|
|
print("\n" + "="*60) |
|
print("MANUAL NULL HANDLING COMPLETED") |
|
print("="*60) |
|
|
|
return 0 |
|
|
|
if __name__ == "__main__": |
|
exit_code = main() |
|
sys.exit(exit_code) |
|
|