import subprocess from pathlib import Path import sys import pandas as pd from datetime import datetime, timedelta from dotenv import load_dotenv import gc import psutil import os # Memory optimization for merge operations class MergeMemoryOptimizer: """Memory optimizer for merge operations""" def __init__(self, max_memory_mb=350): self.max_memory_mb = max_memory_mb self.process = psutil.Process() def get_memory_usage(self): return self.process.memory_info().rss / 1024 / 1024 def cleanup_after_script(self, script_name): collected = gc.collect() memory_after = self.get_memory_usage() print(f"[MemOpt] After {script_name}: {memory_after:.1f}MB (freed {collected} objects)") if memory_after > self.max_memory_mb: print(f"[MemOpt] WARNING: High memory after {script_name}") # Additional cleanup attempt gc.collect() return memory_after # Global memory optimizer instance memory_optimizer = MergeMemoryOptimizer() DAYS_OLD = 7 MERGED_PATH = Path("data/merged/features/merged_features.parquet") ARCHIVE_DIR = Path("data/merged/archive") ARCHIVE_DIR.mkdir(parents=True, exist_ok=True) def run_script(script, args=None): cmd = [sys.executable, str(Path(__file__).parent / script)] if args: cmd += args print(f"Running: {' '.join(cmd)}") # Check memory before running memory_before = memory_optimizer.get_memory_usage() print(f"[MemOpt] Before {script}: {memory_before:.1f}MB") result = subprocess.run(cmd, check=True) # Cleanup after running memory_optimizer.cleanup_after_script(script) return result def archive_old_records(): feature_files = [ Path("data/merged/features/crypto_features.parquet"), Path("data/merged/features/stocks_features.parquet") ] now = datetime.utcnow() cutoff = int((now - timedelta(days=DAYS_OLD)).timestamp() * 1000) for feature_path in feature_files: if not feature_path.exists(): print(f"[WARN] {feature_path} does not exist.") continue df = pd.read_parquet(feature_path) old = df.loc[df['interval_timestamp'] < cutoff].copy() keep = df.loc[df['interval_timestamp'] >= cutoff].copy() if old.empty: print(f"[INFO] No records to archive in {feature_path}.") continue # Group by day (UTC) and write each group to a separate parquet file under archive/{day}/ old['archive_date'] = pd.to_datetime(old['interval_timestamp'], unit='ms').dt.strftime('%Y%m%d') for day, group in old.groupby('archive_date'): day_dir = ARCHIVE_DIR / day day_dir.mkdir(parents=True, exist_ok=True) out_path = day_dir / f"{feature_path.stem}_archived_{day}.parquet" if out_path.exists(): existing = pd.read_parquet(out_path) group = pd.concat([existing, group.drop(columns=['archive_date'])], ignore_index=True) else: group = group.drop(columns=['archive_date']) group.to_parquet(out_path, index=False) print(f"[ARCHIVE] {len(group)} records -> {out_path}") # Save the remaining (unarchived) records back to the feature file keep.to_parquet(feature_path, index=False) print(f"[INFO] Archived {len(old)} records from {feature_path}. {len(keep)} remain.") def store_in_cloud(): # Import StorageHandler from cloud_utils, ensuring src is in sys.path import os import sys sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'src'))) from data_cloud.cloud_utils import StorageHandler # Filebase credentials from env load_dotenv() endpoint_url = os.getenv("FILEBASE_ENDPOINT") access_key = os.getenv("FILEBASE_ACCESS_KEY") secret_key = os.getenv("FILEBASE_SECRET_KEY") bucket_name = os.getenv("FILEBASE_BUCKET") if not all([endpoint_url, access_key, secret_key, bucket_name]): print("[ERROR] Filebase credentials not set in environment.") return storage = StorageHandler(endpoint_url, access_key, secret_key, bucket_name) merged_dir = os.path.join("data", "merged") archive_dir = os.path.join(merged_dir, "archive") # Upload all files in merged except archive for root, dirs, files in os.walk(merged_dir): # Skip archive subdir for now if os.path.abspath(root) == os.path.abspath(archive_dir): continue for fname in files: local_path = os.path.join(root, fname) rel_path = os.path.relpath(local_path, "data") key = rel_path.replace(os.sep, "/") with open(local_path, "rb") as f: data = f.read() storage.upload(key, data) # Only upload archive files newer than DAYS_OLD days import time cutoff = time.time() - DAYS_OLD * 86400 if os.path.exists(archive_dir): for fname in os.listdir(archive_dir): local_path = os.path.join(archive_dir, fname) if not os.path.isfile(local_path): continue mtime = os.path.getmtime(local_path) if mtime >= cutoff: rel_path = os.path.relpath(local_path, "data") key = rel_path.replace(os.sep, "/") with open(local_path, "rb") as f: data = f.read() storage.upload(key, data) # Save stocks and crypto features to data/merged/raw def save_raw_features(): import shutil raw_dir = Path('data/merged/raw') raw_dir.mkdir(parents=True, exist_ok=True) src_stocks = Path('data/merged/features/stocks_features.parquet') src_crypto = Path('data/merged/features/crypto_features.parquet') dst_stocks = raw_dir / 'stocks_features.parquet' dst_crypto = raw_dir / 'crypto_features.parquet' if src_stocks.exists(): shutil.copy2(src_stocks, dst_stocks) print(f"[RAW] Saved stocks features to {dst_stocks}") else: print(f"[RAW] Source stocks features not found: {src_stocks}") if src_crypto.exists(): shutil.copy2(src_crypto, dst_crypto) print(f"[RAW] Saved crypto features to {dst_crypto}") else: print(f"[RAW] Source crypto features not found: {src_crypto}") def main(): print("[MergeOpt] Starting memory-optimized merge pipeline...") initial_memory = memory_optimizer.get_memory_usage() print(f"[MergeOpt] Initial memory: {initial_memory:.1f}MB") # Run all merge steps with memory monitoring run_script('merge_0.py') run_script('merge_1.py', [ '--latest', 'data/advisorai-data/features/latest_features.parquet', '--finnhub', 'data/advisorai-data/features/latest_features.parquet', '--out', 'data/merged/features/merged_features.parquet' ]) run_script('merge_2.py') run_script('merge_3.py') run_script('merge_4.py') run_script('separator.py') run_script('merge_5.py') run_script('merge_6.py') run_script('merge_7.py') save_raw_features() # Extract symbols from exchange symbol data before data fillers try: run_script('extract_symbols.py') except subprocess.CalledProcessError as e: print(f"[WARNING] Symbol extraction failed: {e}") # Remove rows with null symbols after symbol extraction try: run_script('remove_null_symbols.py') except subprocess.CalledProcessError as e: print(f"[WARNING] Null symbol removal failed: {e}") # # Run normalization scripts with error handling # run_script('stocks_data_filler.py') # try: # run_script('crypto_data_filler.py') # except subprocess.CalledProcessError as e: # print(f"[WARNING] Crypto data filler failed: {e}") # Merge temp files into merged - with error handling try: run_script('merge_temp.py') except subprocess.CalledProcessError as e: print(f"[WARNING] Merge temp failed: {e}") try: run_script('merge_sant.py') except subprocess.CalledProcessError as e: print(f"[WARNING] Santiment merge failed: {e}") try: run_script('merge_santiment_with_crypto.py') except subprocess.CalledProcessError as e: print(f"[WARNING] Santiment-crypto merge failed: {e}") # # Final comprehensive null value handling - clean up any remaining nulls # try: # run_script('run_final_null_handling.py') # except subprocess.CalledProcessError as e: # print(f"[WARNING] Final null handling failed: {e}") # # Normalize features # run_script('normalize.py') # # Normalize train files for both crypto and stocks # run_script('norm/crypto.py', ['--train']) # run_script('norm/stocks.py', ['--train']) # Archive old records archive_old_records() # Generate and store full report run_script('full_report.py') # Store all merged data in cloud store_in_cloud() # Final memory check final_memory = memory_optimizer.get_memory_usage() print(f"[MergeOpt] Final memory usage: {final_memory:.1f}MB") if final_memory > 400: print("[MergeOpt] WARNING: High final memory usage") memory_optimizer.cleanup_after_script("final cleanup") print("[OK] All merge steps, null handling, normalization, and reporting completed successfully.") if __name__ == "__main__": main()