File size: 7,085 Bytes
c49b21b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
#!/usr/bin/env python3
"""
Memory-Optimized Main Pipeline for AdvisorAI Data Enhanced
Addresses critical memory issues causing instance failures (512MB limit)
"""
import sys
import os
import gc
import psutil
from datetime import datetime
from contextlib import contextmanager
# Add paths
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".")))
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "fetchers")))
class MemoryMonitor:
"""Memory monitoring and optimization utility"""
def __init__(self, max_memory_mb=450): # Set to 450MB to stay under 512MB limit
self.max_memory_mb = max_memory_mb
self.process = psutil.Process()
def get_memory_usage(self):
"""Get current memory usage in MB"""
return self.process.memory_info().rss / 1024 / 1024
def check_and_cleanup(self, operation_name=""):
"""Check memory usage and cleanup if needed"""
memory_mb = self.get_memory_usage()
if memory_mb > self.max_memory_mb * 0.8: # 80% threshold (360MB)
print(f"[MemOpt] High memory usage during {operation_name}: {memory_mb:.1f}MB")
collected = gc.collect()
new_memory_mb = self.get_memory_usage()
print(f"[MemOpt] Memory after GC: {new_memory_mb:.1f}MB (freed {collected} objects)")
if new_memory_mb > self.max_memory_mb * 0.9: # Still high (405MB)
print(f"[MemOpt] WARNING: Memory still high after cleanup")
return memory_mb
@contextmanager
def memory_context(self, operation_name):
"""Context manager for memory monitoring"""
start_memory = self.get_memory_usage()
print(f"[MemOpt] Starting {operation_name} - Memory: {start_memory:.1f}MB")
try:
yield
finally:
end_memory = self.get_memory_usage()
diff = end_memory - start_memory
print(f"[MemOpt] Finished {operation_name} - Memory: {end_memory:.1f}MB (Δ{diff:+.1f}MB)")
# Force cleanup if memory is getting high
if end_memory > self.max_memory_mb * 0.8:
print(f"[MemOpt] Memory high after {operation_name}, forcing cleanup...")
gc.collect()
final_memory = self.get_memory_usage()
print(f"[MemOpt] Memory after cleanup: {final_memory:.1f}MB")
def run_fetchers_optimized(memory_monitor):
"""Run fetchers with memory optimization"""
try:
with memory_monitor.memory_context("Fetchers"):
# Import fetchers main (only when needed)
from fetchers.main import main as fetchers_main
print("[Pipeline] Starting data fetchers (memory optimized)...")
result = fetchers_main()
# Clear imports to free memory
if 'fetchers.main' in sys.modules:
del sys.modules['fetchers.main']
# Force cleanup after fetchers
memory_monitor.check_and_cleanup("Fetchers")
return result
except Exception as e:
print(f"[Pipeline] Error in fetchers: {e}")
# Still cleanup on error
memory_monitor.check_and_cleanup("Fetchers (error)")
return False
def run_merge_optimized(memory_monitor):
"""Run merge operations with memory optimization"""
try:
with memory_monitor.memory_context("Merge"):
# Import merge main (only when needed)
from merge import main as merge_main
print("[Pipeline] Starting data merge (memory optimized)...")
result = merge_main.main()
# Clear imports to free memory
if 'merge.main' in sys.modules:
del sys.modules['merge.main']
# Force cleanup after merge
memory_monitor.check_and_cleanup("Merge")
return result
except Exception as e:
print(f"[Pipeline] Error in merge: {e}")
# Still cleanup on error
memory_monitor.check_and_cleanup("Merge (error)")
return False
def main():
"""Memory-optimized main pipeline execution"""
print("AdvisorAI Data Pipeline - Memory Optimized")
print("=" * 50)
print(f"Pipeline started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Initialize memory monitor
memory_monitor = MemoryMonitor(max_memory_mb=450) # Stay under 512MB limit
initial_memory = memory_monitor.get_memory_usage()
print(f"[Pipeline] Initial memory usage: {initial_memory:.1f}MB")
# Check if we're already too high
if initial_memory > 200:
print(f"[Pipeline] WARNING: High initial memory usage: {initial_memory:.1f}MB")
memory_monitor.check_and_cleanup("Initial")
try:
# Step 1: Run fetchers with memory optimization
print("\n" + "="*30)
print("STEP 1: DATA FETCHERS")
print("="*30)
fetchers_success = run_fetchers_optimized(memory_monitor)
if not fetchers_success:
print("[Pipeline] Fetchers failed, but continuing to merge existing data...")
# Memory checkpoint
mid_memory = memory_monitor.get_memory_usage()
print(f"\n[Pipeline] Memory after fetchers: {mid_memory:.1f}MB")
if mid_memory > 400: # Getting close to limit
print("[Pipeline] Memory high after fetchers, forcing cleanup...")
gc.collect()
mid_memory = memory_monitor.get_memory_usage()
print(f"[Pipeline] Memory after cleanup: {mid_memory:.1f}MB")
# Step 2: Run merge with memory optimization
print("\n" + "="*30)
print("STEP 2: DATA MERGE")
print("="*30)
merge_success = run_merge_optimized(memory_monitor)
if not merge_success:
print("[Pipeline] Merge failed")
return False
# Final memory check
final_memory = memory_monitor.get_memory_usage()
print(f"\n[Pipeline] Final memory usage: {final_memory:.1f}MB")
if final_memory > 450: # Close to 512MB limit
print("⚠️ WARNING: Memory usage approaching limit - optimization needed")
print(f"Pipeline ended at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("[OK] All steps completed successfully!")
return True
except Exception as e:
import traceback
print(f"[ERROR] Pipeline execution failed: {e}")
print(traceback.format_exc())
# Emergency memory cleanup
print("[Pipeline] Emergency memory cleanup...")
memory_monitor.check_and_cleanup("Emergency")
return False
if __name__ == "__main__":
success = main()
if not success:
sys.exit(1)
|