#!/usr/bin/env python3 """ Memory-Optimized Main Pipeline for AdvisorAI Data Enhanced Addresses critical memory issues causing instance failures (512MB limit) """ import sys import os import gc import psutil from datetime import datetime from contextlib import contextmanager # Add paths sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "."))) sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "fetchers"))) class MemoryMonitor: """Memory monitoring and optimization utility""" def __init__(self, max_memory_mb=450): # Set to 450MB to stay under 512MB limit self.max_memory_mb = max_memory_mb self.process = psutil.Process() def get_memory_usage(self): """Get current memory usage in MB""" return self.process.memory_info().rss / 1024 / 1024 def check_and_cleanup(self, operation_name=""): """Check memory usage and cleanup if needed""" memory_mb = self.get_memory_usage() if memory_mb > self.max_memory_mb * 0.8: # 80% threshold (360MB) print(f"[MemOpt] High memory usage during {operation_name}: {memory_mb:.1f}MB") collected = gc.collect() new_memory_mb = self.get_memory_usage() print(f"[MemOpt] Memory after GC: {new_memory_mb:.1f}MB (freed {collected} objects)") if new_memory_mb > self.max_memory_mb * 0.9: # Still high (405MB) print(f"[MemOpt] WARNING: Memory still high after cleanup") return memory_mb @contextmanager def memory_context(self, operation_name): """Context manager for memory monitoring""" start_memory = self.get_memory_usage() print(f"[MemOpt] Starting {operation_name} - Memory: {start_memory:.1f}MB") try: yield finally: end_memory = self.get_memory_usage() diff = end_memory - start_memory print(f"[MemOpt] Finished {operation_name} - Memory: {end_memory:.1f}MB (Δ{diff:+.1f}MB)") # Force cleanup if memory is getting high if end_memory > self.max_memory_mb * 0.8: print(f"[MemOpt] Memory high after {operation_name}, forcing cleanup...") gc.collect() final_memory = self.get_memory_usage() print(f"[MemOpt] Memory after cleanup: {final_memory:.1f}MB") def run_fetchers_optimized(memory_monitor): """Run fetchers with memory optimization""" try: with memory_monitor.memory_context("Fetchers"): # Import fetchers main (only when needed) from fetchers.main import main as fetchers_main print("[Pipeline] Starting data fetchers (memory optimized)...") result = fetchers_main() # Clear imports to free memory if 'fetchers.main' in sys.modules: del sys.modules['fetchers.main'] # Force cleanup after fetchers memory_monitor.check_and_cleanup("Fetchers") return result except Exception as e: print(f"[Pipeline] Error in fetchers: {e}") # Still cleanup on error memory_monitor.check_and_cleanup("Fetchers (error)") return False def run_merge_optimized(memory_monitor): """Run merge operations with memory optimization""" try: with memory_monitor.memory_context("Merge"): # Import merge main (only when needed) from merge import main as merge_main print("[Pipeline] Starting data merge (memory optimized)...") result = merge_main.main() # Clear imports to free memory if 'merge.main' in sys.modules: del sys.modules['merge.main'] # Force cleanup after merge memory_monitor.check_and_cleanup("Merge") return result except Exception as e: print(f"[Pipeline] Error in merge: {e}") # Still cleanup on error memory_monitor.check_and_cleanup("Merge (error)") return False def main(): """Memory-optimized main pipeline execution""" print("AdvisorAI Data Pipeline - Memory Optimized") print("=" * 50) print(f"Pipeline started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # Initialize memory monitor memory_monitor = MemoryMonitor(max_memory_mb=450) # Stay under 512MB limit initial_memory = memory_monitor.get_memory_usage() print(f"[Pipeline] Initial memory usage: {initial_memory:.1f}MB") # Check if we're already too high if initial_memory > 200: print(f"[Pipeline] WARNING: High initial memory usage: {initial_memory:.1f}MB") memory_monitor.check_and_cleanup("Initial") try: # Step 1: Run fetchers with memory optimization print("\n" + "="*30) print("STEP 1: DATA FETCHERS") print("="*30) fetchers_success = run_fetchers_optimized(memory_monitor) if not fetchers_success: print("[Pipeline] Fetchers failed, but continuing to merge existing data...") # Memory checkpoint mid_memory = memory_monitor.get_memory_usage() print(f"\n[Pipeline] Memory after fetchers: {mid_memory:.1f}MB") if mid_memory > 400: # Getting close to limit print("[Pipeline] Memory high after fetchers, forcing cleanup...") gc.collect() mid_memory = memory_monitor.get_memory_usage() print(f"[Pipeline] Memory after cleanup: {mid_memory:.1f}MB") # Step 2: Run merge with memory optimization print("\n" + "="*30) print("STEP 2: DATA MERGE") print("="*30) merge_success = run_merge_optimized(memory_monitor) if not merge_success: print("[Pipeline] Merge failed") return False # Final memory check final_memory = memory_monitor.get_memory_usage() print(f"\n[Pipeline] Final memory usage: {final_memory:.1f}MB") if final_memory > 450: # Close to 512MB limit print("⚠️ WARNING: Memory usage approaching limit - optimization needed") print(f"Pipeline ended at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("[OK] All steps completed successfully!") return True except Exception as e: import traceback print(f"[ERROR] Pipeline execution failed: {e}") print(traceback.format_exc()) # Emergency memory cleanup print("[Pipeline] Emergency memory cleanup...") memory_monitor.check_and_cleanup("Emergency") return False if __name__ == "__main__": success = main() if not success: sys.exit(1)