advisorai-data-enhanced / src /merge /main_memory_optimized.py
Maaroufabousaleh
f
c49b21b
raw
history blame
2.79 kB
"""
Memory-Optimized Merge Wrapper
Wraps the main merge function with memory monitoring and cleanup
"""
import gc
import os
import sys
import psutil
from pathlib import Path
class MergeMemoryOptimizer:
"""Memory optimizer specifically for merge operations"""
def __init__(self, max_memory_mb=350): # Conservative limit for merge operations
self.max_memory_mb = max_memory_mb
self.process = psutil.Process()
def get_memory_usage(self):
"""Get current memory usage in MB"""
return self.process.memory_info().rss / 1024 / 1024
def cleanup_after_script(self, script_name):
"""Cleanup after running a merge script"""
# Force garbage collection
collected = gc.collect()
# Clear any cached modules
modules_to_clear = [m for m in sys.modules.keys() if 'merge' in m or 'pandas' in m]
for module in modules_to_clear:
if module in sys.modules and module != __name__:
try:
del sys.modules[module]
except:
pass
memory_after = self.get_memory_usage()
print(f"[MemOpt] After {script_name}: {memory_after:.1f}MB (freed {collected} objects)")
if memory_after > self.max_memory_mb:
print(f"[MemOpt] WARNING: High memory after {script_name}: {memory_after:.1f}MB")
return memory_after
# Import the original main function
def main():
"""Memory-optimized wrapper for merge main"""
optimizer = MergeMemoryOptimizer()
initial_memory = optimizer.get_memory_usage()
print(f"[MergeOpt] Starting merge operations - Memory: {initial_memory:.1f}MB")
try:
# Import and run the original main function
from merge.main_original import main as original_main
# Monitor memory during execution
result = original_main()
# Final cleanup
final_memory = optimizer.cleanup_after_script("all merge operations")
print(f"[MergeOpt] Final merge memory: {final_memory:.1f}MB")
return result
except ImportError:
# Fallback to current main if original doesn't exist
print("[MergeOpt] No original main found, running current implementation...")
# Import the current implementation
import merge.main as current_main
result = current_main.main()
# Cleanup
optimizer.cleanup_after_script("current merge implementation")
return result
except Exception as e:
print(f"[MergeOpt] Error in merge operations: {e}")
optimizer.cleanup_after_script("error cleanup")
raise
if __name__ == "__main__":
main()