import io import os import gc import uuid import torch import psutil import numpy as np import cv2 import yaml from typing import List, Dict from datetime import datetime from pathlib import Path from PIL import Image from concurrent.futures import ThreadPoolExecutor, as_completed import uvicorn from fastapi import FastAPI, UploadFile, File, HTTPException, Form from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from src.detection import YOLOv11Detector from src.comparison import DamageComparator from src.visualization import DamageVisualizer # ============= HUGGING FACE SPACES DETECTION ============= IS_HUGGINGFACE_SPACE = os.environ.get("SPACE_ID") is not None IS_ZERO_GPU = os.environ.get("ZERO_GPU") == "true" if IS_HUGGINGFACE_SPACE: print("=" * 60) print("šŸ¤— HUGGING FACE SPACES DETECTED") print(f" Space ID: {os.environ.get('SPACE_ID')}") print(f" Hardware: {'Zero GPU' if IS_ZERO_GPU else 'CPU Basic (2 vCPU, 16GB RAM)'}") print("=" * 60) # ============= EARLY OPTIMIZATION SETTINGS ============= def apply_huggingface_optimizations(): """Apply optimizations for HuggingFace Spaces constraints BEFORE any imports""" if not IS_HUGGINGFACE_SPACE: return # Force thread limits for 2 vCPU thread_limit = "2" os.environ['OMP_NUM_THREADS'] = thread_limit os.environ['MKL_NUM_THREADS'] = thread_limit os.environ['NUMEXPR_NUM_THREADS'] = thread_limit os.environ['OPENBLAS_NUM_THREADS'] = thread_limit os.environ['VECLIB_MAXIMUM_THREADS'] = thread_limit # Pytorch specific os.environ['TORCH_NUM_THREADS'] = thread_limit os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:512' # Disable debugging/profiling for performance os.environ['PYTHONOPTIMIZE'] = '1' torch.set_num_threads(2) torch.set_num_interop_threads(1) torch.set_grad_enabled(False) # Inference only print("āœ… Applied HuggingFace Spaces optimizations") # Apply optimizations EARLY apply_huggingface_optimizations() # ============= PERFORMANCE CONFIGURATION ============= class PerformanceConfig: """Smart performance configuration with HuggingFace detection""" def __init__(self): if IS_HUGGINGFACE_SPACE: self._configure_for_huggingface() else: self._auto_configure() def _configure_for_huggingface(self): """Optimized settings for HuggingFace Spaces""" # HF Spaces free tier: 2 vCPU, 16GB RAM self.physical_cores = 2 self.logical_cores = 2 self.total_memory_gb = 16.0 # Conservative settings for stability self.max_thread_workers = 2 self.max_process_workers = 1 self.uvicorn_workers = 1 # CRITICAL: Must be 1 # ONNX optimization self.onnx_intra_threads = 2 self.onnx_inter_threads = 1 # Small batch sizes for memory efficiency self.detection_batch_size = 1 self.comparison_batch_size = 1 self.optimization_level = "HuggingFace-Optimized" # Cache settings self.enable_model_cache = True self.max_cache_size = 100 # MB print(f"šŸ¤— HuggingFace Config:") print(f" • 2 vCPU, 16GB RAM (enforced)") print(f" • Single worker mode") print(f" • Batch size: 1") print(f" • ONNX preferred for CPU performance") def _auto_configure(self): """Auto-detect configuration for other environments""" self.physical_cores = psutil.cpu_count(logical=False) or 2 self.logical_cores = psutil.cpu_count(logical=True) or 2 self.total_memory_gb = psutil.virtual_memory().total / (1024**3) # Scale based on actual resources if self.physical_cores >= 8 and self.total_memory_gb >= 64: self._high_performance_config() elif self.physical_cores >= 4 and self.total_memory_gb >= 16: self._medium_performance_config() else: self._low_performance_config() def _high_performance_config(self): self.max_thread_workers = min(6, self.physical_cores - 2) self.max_process_workers = 2 self.uvicorn_workers = min(4, self.physical_cores // 2) self.onnx_intra_threads = min(8, self.physical_cores) self.onnx_inter_threads = 2 self.detection_batch_size = 4 self.comparison_batch_size = 3 self.optimization_level = "High-Performance" self.enable_model_cache = True self.max_cache_size = 500 def _medium_performance_config(self): self.max_thread_workers = min(4, self.physical_cores) self.max_process_workers = 1 self.uvicorn_workers = 2 self.onnx_intra_threads = min(4, self.physical_cores) self.onnx_inter_threads = 1 self.detection_batch_size = 2 self.comparison_batch_size = 2 self.optimization_level = "Medium-Performance" self.enable_model_cache = True self.max_cache_size = 200 def _low_performance_config(self): self.max_thread_workers = 2 self.max_process_workers = 1 self.uvicorn_workers = 1 self.onnx_intra_threads = 2 self.onnx_inter_threads = 1 self.detection_batch_size = 1 self.comparison_batch_size = 1 self.optimization_level = "Conservative" self.enable_model_cache = True self.max_cache_size = 100 # Initialize performance config perf_config = PerformanceConfig() # ============= FASTAPI APP SETUP ============= app = FastAPI( title="Car Damage Detection API", description=f"YOLOv11 + DINOv2 ReID ({'HuggingFace Spaces Optimized' if IS_HUGGINGFACE_SPACE else 'Auto-Optimized'})", version="2.0.0" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ============= MODEL PATHS ============= MODEL_PATHS = { 0: "models_small/best.pt", 1: "models_small_version_2/best.pt", 2: "models_small_version3/best.pt", 3: "models_medium/best.pt", 4: "models_medium_version_2/best.pt", 5: "model_medium_version3/best.pt", 6: "models_small/best.onnx", 7: "models_small_version_2/best.onnx", 8: "models_small_version3/best.onnx", 9: "models_medium/best.onnx", 10: "models_medium_version_2/best.onnx", 11: "model_medium_version3/best.onnx" } CONFIG_PATHS = { 0: "config.yaml", 1: "config_version2.yaml", 2: "config_version3.yaml", 3: "config.yaml", 4: "config_version2.yaml", 5: "config_version3.yaml", 6: "config.yaml", 7: "config_version2.yaml", 8: "config_version3.yaml", 9: "config.yaml", 10: "config_version2.yaml", 11: "config_version3.yaml" } # ============= GLOBAL MODEL CACHE ============= class ModelCache: """Simple model cache to avoid reloading""" def __init__(self): self.detector = None self.comparator = None self.visualizer = None self.current_model_index = None def get_or_load(self, model_index: int, prefer_onnx: bool = True): """Get cached model or load new one""" if self.current_model_index == model_index and self.detector is not None: return self.detector, self.comparator, self.visualizer # Clear old models self.clear() # Load new models self.detector = self._load_detector(model_index, prefer_onnx) config_file = CONFIG_PATHS.get(model_index, "config.yaml") self.comparator = DamageComparator(config_path=config_file) self.visualizer = DamageVisualizer(config_path=config_file) self.current_model_index = model_index return self.detector, self.comparator, self.visualizer def _load_detector(self, model_index: int, prefer_onnx: bool): """Load detector with HuggingFace optimizations""" # For HuggingFace, strongly prefer ONNX if IS_HUGGINGFACE_SPACE and prefer_onnx: # Try to use ONNX version onnx_mapping = {0: 6, 1: 7, 2: 8, 3: 9, 4: 10, 5: 11} if model_index in onnx_mapping: onnx_index = onnx_mapping[model_index] if Path(MODEL_PATHS[onnx_index]).exists(): model_index = onnx_index print(f"šŸš€ Using ONNX model for better CPU performance") config_file = CONFIG_PATHS.get(model_index, "config.yaml") # Load and update config with open(config_file, 'r') as f: config = yaml.safe_load(f) config['model']['path'] = MODEL_PATHS[model_index] # Add performance settings if 'performance' not in config: config['performance'] = {} config['performance'].update({ 'max_thread_workers': perf_config.max_thread_workers, 'onnx_intra_threads': perf_config.onnx_intra_threads, 'onnx_inter_threads': perf_config.onnx_inter_threads, 'detection_batch_size': perf_config.detection_batch_size }) # Save temp config temp_config = f'temp_config_{model_index}_optimized.yaml' with open(temp_config, 'w') as f: yaml.dump(config, f, default_flow_style=False) return YOLOv11Detector(config_path=temp_config) def clear(self): """Clear cached models""" if self.detector: del self.detector if self.comparator: del self.comparator if self.visualizer: del self.visualizer self.detector = None self.comparator = None self.visualizer = None gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() # Initialize model cache model_cache = ModelCache() # ============= LAZY INITIALIZATION ============= def get_models(model_index: int = 7, prefer_onnx: bool = True): """Get models with lazy loading""" # Default to Small v2 ONNX for HuggingFace if IS_HUGGINGFACE_SPACE and model_index not in range(12): model_index = 7 # Small v2 ONNX return model_cache.get_or_load(model_index, prefer_onnx) # ============= STARTUP EVENT ============= @app.on_event("startup") async def startup_event(): """Initialize on startup""" print("\n" + "="*60) print("šŸš€ STARTING CAR DAMAGE DETECTION API") print(f"šŸ“ Environment: {'HuggingFace Spaces' if IS_HUGGINGFACE_SPACE else 'Standard'}") print(f"āš™ļø Optimization: {perf_config.optimization_level}") print(f"šŸ’¾ Memory: {perf_config.total_memory_gb:.1f}GB") print(f"šŸ”§ CPU: {perf_config.physical_cores} cores") print("="*60 + "\n") # Create directories Path("uploads").mkdir(exist_ok=True) Path("results").mkdir(exist_ok=True) # Preload default model for faster first request if IS_HUGGINGFACE_SPACE: print("šŸ“¦ Preloading default model...") get_models(7, prefer_onnx=True) # Small v2 ONNX gc.collect() print("āœ… Ready to serve requests!\n") # Mount static files app.mount("/uploads", StaticFiles(directory="uploads"), name="uploads") # ============= API ENDPOINTS ============= @app.get("/") async def root(): """Root endpoint with environment info""" memory = psutil.virtual_memory() cpu_percent = psutil.cpu_percent(interval=0.1) return { "message": "Car Damage Detection API", "version": "2.0.0", "environment": { "platform": "HuggingFace Spaces" if IS_HUGGINGFACE_SPACE else "Standard", "optimization": perf_config.optimization_level, "hardware": { "cpu_cores": perf_config.physical_cores, "memory_gb": f"{perf_config.total_memory_gb:.1f}", "cpu_usage": f"{cpu_percent:.1f}%", "memory_usage": f"{memory.percent:.1f}%" } }, "performance": { "max_workers": perf_config.max_thread_workers, "batch_size": perf_config.detection_batch_size, "preferred_model": "ONNX (CPU optimized)" if IS_HUGGINGFACE_SPACE else "Auto" }, "endpoints": { "/docs": "API documentation", "/detect": "Single/Multi image detection", "/compare": "Compare before/after images", "/health": "Health check", "/performance": "Performance metrics" } } @app.get("/health") async def health_check(): """Health check endpoint""" return { "status": "healthy", "timestamp": datetime.now().isoformat(), "environment": "HuggingFace" if IS_HUGGINGFACE_SPACE else "Standard" } @app.get("/performance") async def performance_metrics(): """Real-time performance metrics""" cpu_percent = psutil.cpu_percent(interval=1) memory = psutil.virtual_memory() # Calculate recommended settings high_load = cpu_percent > 70 or memory.percent > 70 return { "current": { "cpu_percent": cpu_percent, "memory_percent": memory.percent, "memory_available_gb": memory.available / (1024**3) }, "limits": { "cpu_cores": perf_config.physical_cores, "memory_gb": perf_config.total_memory_gb }, "recommendations": { "reduce_load": high_load, "suggested_batch_size": 1 if high_load else perf_config.detection_batch_size, "suggested_workers": 1 if high_load else perf_config.max_thread_workers } } @app.post("/detect") async def detect_damage( file: UploadFile = File(None), files: List[UploadFile] = File(None), select_models: int = Form(7), # Default to ONNX prefer_onnx: bool = Form(True) ): """Optimized detection endpoint""" try: # Get models detector, comparator, visualizer = get_models(select_models, prefer_onnx) # Process single file if file: contents = await file.read() image = Image.open(io.BytesIO(contents)).convert("RGB") image_np = np.array(image) image_bgr = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) # Detect detections = detector.detect(image_bgr) # Visualize visualized = visualizer.draw_detections(image_bgr, detections, 'new_damage') # Save filename = f"detection_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}.jpg" output_path = Path("uploads") / filename cv2.imwrite(str(output_path), visualized) return JSONResponse({ "status": "success", "detections": detections, "statistics": { "total_damages": len(detections['boxes']), "damage_types": list(set(detections['classes'])) }, "visualized_image_url": f"/uploads/{filename}" }) # Process multiple files elif files: results = [] for f in files: contents = await f.read() image = Image.open(io.BytesIO(contents)).convert("RGB") image_np = np.array(image) image_bgr = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) detections = detector.detect(image_bgr) results.append(detections) # Memory cleanup del image, image_np gc.collect() return JSONResponse({ "status": "success", "results": results, "total_images": len(files) }) else: raise HTTPException(400, "No image provided") except Exception as e: gc.collect() raise HTTPException(500, f"Detection failed: {str(e)}") @app.post("/compare") async def compare_damages( before_images: List[UploadFile] = File(...), after_images: List[UploadFile] = File(...), select_models: int = Form(7), prefer_onnx: bool = Form(True) ): """Optimized comparison endpoint""" try: # Validate if len(before_images) != len(after_images): raise HTTPException(400, "Number of before/after images must match") if len(before_images) > 6: raise HTTPException(400, "Maximum 6 image pairs allowed") # Get models detector, comparator, visualizer = get_models(select_models, prefer_onnx) results = [] # Process each pair for before_file, after_file in zip(before_images, after_images): # Read images before_contents = await before_file.read() after_contents = await after_file.read() before_img = Image.open(io.BytesIO(before_contents)).convert("RGB") after_img = Image.open(io.BytesIO(after_contents)).convert("RGB") before_bgr = cv2.cvtColor(np.array(before_img), cv2.COLOR_RGB2BGR) after_bgr = cv2.cvtColor(np.array(after_img), cv2.COLOR_RGB2BGR) # Detect before_det = detector.detect(before_bgr) after_det = detector.detect(after_bgr) # Compare comparison = comparator.analyze_damage_status( before_det, after_det, before_bgr, after_bgr ) results.append({ "before_damages": len(before_det['boxes']), "after_damages": len(after_det['boxes']), "comparison": comparison }) # Cleanup del before_img, after_img, before_bgr, after_bgr gc.collect() return JSONResponse({ "status": "success", "comparisons": results, "total_pairs": len(results) }) except Exception as e: gc.collect() raise HTTPException(500, f"Comparison failed: {str(e)}") # ============= MAIN ENTRY POINT ============= if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) if IS_HUGGINGFACE_SPACE: # HuggingFace Spaces specific print("\nšŸ¤— Starting server for HuggingFace Spaces...") print(f"šŸ“ Port: {port}") print("āš ļø Using single worker mode for stability\n") uvicorn.run( app, # Direct app reference host="0.0.0.0", port=port, workers=1, # MUST be 1 for HuggingFace log_level="info", access_log=False # Reduce overhead ) else: # Standard deployment uvicorn.run( "main:app", host="0.0.0.0", port=port, workers=perf_config.uvicorn_workers, log_level="info", reload=False )