Maaroufabousaleh
f
c49b21b
raw
history blame
7.09 kB
#!/usr/bin/env python3
"""
Memory-Optimized Main Pipeline for AdvisorAI Data Enhanced
Addresses critical memory issues causing instance failures (512MB limit)
"""
import sys
import os
import gc
import psutil
from datetime import datetime
from contextlib import contextmanager
# Add paths
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".")))
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "fetchers")))
class MemoryMonitor:
"""Memory monitoring and optimization utility"""
def __init__(self, max_memory_mb=450): # Set to 450MB to stay under 512MB limit
self.max_memory_mb = max_memory_mb
self.process = psutil.Process()
def get_memory_usage(self):
"""Get current memory usage in MB"""
return self.process.memory_info().rss / 1024 / 1024
def check_and_cleanup(self, operation_name=""):
"""Check memory usage and cleanup if needed"""
memory_mb = self.get_memory_usage()
if memory_mb > self.max_memory_mb * 0.8: # 80% threshold (360MB)
print(f"[MemOpt] High memory usage during {operation_name}: {memory_mb:.1f}MB")
collected = gc.collect()
new_memory_mb = self.get_memory_usage()
print(f"[MemOpt] Memory after GC: {new_memory_mb:.1f}MB (freed {collected} objects)")
if new_memory_mb > self.max_memory_mb * 0.9: # Still high (405MB)
print(f"[MemOpt] WARNING: Memory still high after cleanup")
return memory_mb
@contextmanager
def memory_context(self, operation_name):
"""Context manager for memory monitoring"""
start_memory = self.get_memory_usage()
print(f"[MemOpt] Starting {operation_name} - Memory: {start_memory:.1f}MB")
try:
yield
finally:
end_memory = self.get_memory_usage()
diff = end_memory - start_memory
print(f"[MemOpt] Finished {operation_name} - Memory: {end_memory:.1f}MB (Δ{diff:+.1f}MB)")
# Force cleanup if memory is getting high
if end_memory > self.max_memory_mb * 0.8:
print(f"[MemOpt] Memory high after {operation_name}, forcing cleanup...")
gc.collect()
final_memory = self.get_memory_usage()
print(f"[MemOpt] Memory after cleanup: {final_memory:.1f}MB")
def run_fetchers_optimized(memory_monitor):
"""Run fetchers with memory optimization"""
try:
with memory_monitor.memory_context("Fetchers"):
# Import fetchers main (only when needed)
from fetchers.main import main as fetchers_main
print("[Pipeline] Starting data fetchers (memory optimized)...")
result = fetchers_main()
# Clear imports to free memory
if 'fetchers.main' in sys.modules:
del sys.modules['fetchers.main']
# Force cleanup after fetchers
memory_monitor.check_and_cleanup("Fetchers")
return result
except Exception as e:
print(f"[Pipeline] Error in fetchers: {e}")
# Still cleanup on error
memory_monitor.check_and_cleanup("Fetchers (error)")
return False
def run_merge_optimized(memory_monitor):
"""Run merge operations with memory optimization"""
try:
with memory_monitor.memory_context("Merge"):
# Import merge main (only when needed)
from merge import main as merge_main
print("[Pipeline] Starting data merge (memory optimized)...")
result = merge_main.main()
# Clear imports to free memory
if 'merge.main' in sys.modules:
del sys.modules['merge.main']
# Force cleanup after merge
memory_monitor.check_and_cleanup("Merge")
return result
except Exception as e:
print(f"[Pipeline] Error in merge: {e}")
# Still cleanup on error
memory_monitor.check_and_cleanup("Merge (error)")
return False
def main():
"""Memory-optimized main pipeline execution"""
print("AdvisorAI Data Pipeline - Memory Optimized")
print("=" * 50)
print(f"Pipeline started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Initialize memory monitor
memory_monitor = MemoryMonitor(max_memory_mb=450) # Stay under 512MB limit
initial_memory = memory_monitor.get_memory_usage()
print(f"[Pipeline] Initial memory usage: {initial_memory:.1f}MB")
# Check if we're already too high
if initial_memory > 200:
print(f"[Pipeline] WARNING: High initial memory usage: {initial_memory:.1f}MB")
memory_monitor.check_and_cleanup("Initial")
try:
# Step 1: Run fetchers with memory optimization
print("\n" + "="*30)
print("STEP 1: DATA FETCHERS")
print("="*30)
fetchers_success = run_fetchers_optimized(memory_monitor)
if not fetchers_success:
print("[Pipeline] Fetchers failed, but continuing to merge existing data...")
# Memory checkpoint
mid_memory = memory_monitor.get_memory_usage()
print(f"\n[Pipeline] Memory after fetchers: {mid_memory:.1f}MB")
if mid_memory > 400: # Getting close to limit
print("[Pipeline] Memory high after fetchers, forcing cleanup...")
gc.collect()
mid_memory = memory_monitor.get_memory_usage()
print(f"[Pipeline] Memory after cleanup: {mid_memory:.1f}MB")
# Step 2: Run merge with memory optimization
print("\n" + "="*30)
print("STEP 2: DATA MERGE")
print("="*30)
merge_success = run_merge_optimized(memory_monitor)
if not merge_success:
print("[Pipeline] Merge failed")
return False
# Final memory check
final_memory = memory_monitor.get_memory_usage()
print(f"\n[Pipeline] Final memory usage: {final_memory:.1f}MB")
if final_memory > 450: # Close to 512MB limit
print("⚠️ WARNING: Memory usage approaching limit - optimization needed")
print(f"Pipeline ended at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("[OK] All steps completed successfully!")
return True
except Exception as e:
import traceback
print(f"[ERROR] Pipeline execution failed: {e}")
print(traceback.format_exc())
# Emergency memory cleanup
print("[Pipeline] Emergency memory cleanup...")
memory_monitor.check_and_cleanup("Emergency")
return False
if __name__ == "__main__":
success = main()
if not success:
sys.exit(1)