import numpy as np from typing import List, Dict import cv2 from pathlib import Path import torch import yaml import onnxruntime as ort import os import psutil def _get_optimal_threads(): """Calculate optimal thread count for current system""" physical_cores = psutil.cpu_count(logical=False) logical_cores = psutil.cpu_count(logical=True) # Optimal intra-op threads = physical cores # For high-performance scenarios, use physical cores intra_threads = physical_cores if physical_cores else 4 print(f"System info: {physical_cores} physical cores, {logical_cores} logical cores") print(f"Using {intra_threads} intra-op threads for optimal performance") return intra_threads def _preprocess_image_optimized(image: np.ndarray) -> np.ndarray: """Optimized preprocessing for minimal overhead""" height, width = image.shape[:2] # Resize with optimal interpolation img_resized = cv2.resize(image, (640, 640), interpolation=cv2.INTER_LINEAR) # RGB conversion (most efficient method) img_rgb = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB) # Normalize and transpose in one operation (memory efficient) img_normalized = img_rgb.astype(np.float32, copy=False) / 255.0 img_transposed = np.transpose(img_normalized, (2, 0, 1)) img_batch = np.expand_dims(img_transposed, axis=0) return img_batch, height, width class YOLOv11Detector: """YOLOv11 detector optimized for ONNX Runtime v1.19 with opset 21""" def __init__(self, config_path: str = "config.yaml"): """Initialize YOLOv11 detector with maximum ONNX Runtime optimizations""" with open(config_path, 'r') as f: self.config = yaml.safe_load(f) model_path = self.config['model']['path'] # Check which model file exists if not Path(model_path).exists(): # Try to find available model files model_dir = Path("models") if (model_dir / "best.pt").exists(): model_path = str(model_dir / "best.pt") print(f"Using best.pt model from training") elif (model_dir / "last.pt").exists(): model_path = str(model_dir / "last.pt") print(f"Using last.pt checkpoint model") elif (model_dir / "best.onnx").exists(): model_path = str(model_dir / "best.onnx") print(f"Using best.onnx model") else: raise FileNotFoundError(f"No model files found in models/ directory!") self.model_path = model_path self.device = self.config['model']['device'] self.confidence = self.config['model']['confidence'] self.iou_threshold = self.config['model']['iou_threshold'] self.classes = self.config['detection']['classes'] # Load model based on extension if self.model_path.endswith('.onnx'): self._load_onnx_model_optimized() # Will use optimizations for ONNX models else: self._load_pytorch_model() # Keep original PyTorch logic def _load_pytorch_model(self): """Load PyTorch model using Ultralytics""" from ultralytics import YOLO self.model = YOLO(self.model_path) # Set model to appropriate device if self.device == 'cuda:0' and torch.cuda.is_available(): self.model.to('cuda') else: self.model.to('cpu') print(f"Loaded PyTorch model: {self.model_path} on device: {self.device}") def _load_onnx_model_optimized(self): """Load ONNX model with MAXIMUM optimizations for v1.19 + opset 21""" # Get optimal thread count intra_threads = _get_optimal_threads() # Configure MAXIMUM performance session options sess_options = ort.SessionOptions() # === GRAPH OPTIMIZATIONS (Level: ALL) === # Enable ALL optimizations including layout optimizations sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL # === THREADING OPTIMIZATIONS === # Intra-op parallelism (within operators) - use physical cores sess_options.intra_op_num_threads = intra_threads # Inter-op parallelism (between operators) - keep at 1 for sequential execution # Sequential execution often performs better than parallel for single inference sess_options.inter_op_num_threads = 1 sess_options.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL # === MEMORY OPTIMIZATIONS === # Enable memory pattern optimization (reduces memory allocation overhead) sess_options.enable_mem_pattern = True # Enable memory arena optimization (better memory reuse) sess_options.enable_cpu_mem_arena = True # === CPU PERFORMANCE OPTIMIZATIONS === # Allow threads to spin waiting for work (trades CPU for latency) sess_options.add_session_config_entry("session.intra_op.allow_spinning", "1") sess_options.add_session_config_entry("session.inter_op.allow_spinning", "1") # Dynamic cost model for better load balancing (reduces latency variance) # Best value for dynamic_block_base is 4 according to docs sess_options.add_session_config_entry("session.intra_op.dynamic_block_base", "4") # For systems with >64 logical cores, use lock-free queues logical_cores = psutil.cpu_count(logical=True) if logical_cores and logical_cores > 64: sess_options.add_session_config_entry("session.use_lock_free_queue", "1") print("Enabled lock-free queues for high-core system") # Disable profiling in production for best performance sess_options.enable_profiling = False # === EXECUTION PROVIDER CONFIGURATION === providers = [] provider_options = [] if self.device == 'cuda:0' and ort.get_device() == 'GPU': # CUDA EP with optimizations cuda_options = { 'device_id': 0, 'arena_extend_strategy': 'kNextPowerOfTwo', # Better memory allocation 'gpu_mem_limit': 2 * 1024 * 1024 * 1024, # 2GB limit 'cudnn_conv_algo_search': 'EXHAUSTIVE', # Find best conv algorithms 'do_copy_in_default_stream': True # Better stream utilization } providers.append('CUDAExecutionProvider') provider_options.append(cuda_options) print("CUDA EP configured with optimizations") # CPU EP with OpenMP optimizations (always fallback) cpu_options = { 'use_arena': True, 'arena_extend_strategy': 'kSameAsRequested' } providers.append('CPUExecutionProvider') provider_options.append(cpu_options) # === SET OPENMP ENVIRONMENT VARIABLES FOR OPTIMAL CPU PERFORMANCE === # These should ideally be set before importing onnxruntime, but we set them anyway os.environ['OMP_NUM_THREADS'] = str(intra_threads) os.environ['OMP_WAIT_POLICY'] = 'ACTIVE' # Don't yield CPU, faster inference os.environ['OMP_MAX_ACTIVE_LEVELS'] = '1' # Tương đương disable nested parallelism (max levels = 1) # For Intel CPUs: compact affinity for better cache usage os.environ['KMP_AFFINITY'] = 'granularity=fine,compact,1,0' print(f"OpenMP configuration: threads={intra_threads}, policy=ACTIVE") # === CREATE OPTIMIZED SESSION === self.session = ort.InferenceSession( self.model_path, sess_options=sess_options, providers=providers, provider_options=provider_options ) # Get input/output info self.input_name = self.session.get_inputs()[0].name self.output_name = self.session.get_outputs()[0].name # Verify opset version (should be 21 for latest optimizations) try: # This might not always be available, but good to check model_meta = self.session.get_modelmeta() print(f"Model metadata - Domain: {getattr(model_meta, 'domain', 'N/A')}") except: pass provider_used = self.session.get_providers()[0] print(f"✅ ONNX Runtime v{ort.__version__} - Optimized session created") print(f"📈 Provider: {provider_used}") print(f"🧵 Threading: {intra_threads} intra-op threads, sequential execution") print(f"🚀 Optimizations: Graph=ALL, Memory=Enabled, Spinning=Enabled, Dynamic=Enabled") def detect(self, image: np.ndarray) -> Dict: """ Perform detection on image with maximum optimization Args: image: Input image as numpy array (BGR format) Returns: Dictionary containing detection results """ if self.model_path.endswith('.onnx'): return self._detect_onnx_optimized(image) else: return self._detect_pytorch(image) def _detect_pytorch(self, image: np.ndarray) -> Dict: """Detection using PyTorch model""" from ultralytics import YOLO results = self.model( image, conf=self.confidence, iou=self.iou_threshold, device=self.device if 'cuda' in self.device and torch.cuda.is_available() else 'cpu', verbose=False ) # Parse results detections = { 'boxes': [], 'confidences': [], 'classes': [], 'class_ids': [] } if len(results) > 0 and results[0].boxes is not None: boxes = results[0].boxes for box in boxes: x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() conf = float(box.conf[0].cpu().numpy()) cls_id = int(box.cls[0].cpu().numpy()) cls_name = self.classes[cls_id] if cls_id < len(self.classes) else f"class_{cls_id}" detections['boxes'].append([int(x1), int(y1), int(x2), int(y2)]) detections['confidences'].append(conf) detections['classes'].append(cls_name) detections['class_ids'].append(cls_id) return detections def _detect_onnx_optimized(self, image: np.ndarray) -> Dict: """Optimized ONNX detection with minimal overhead""" # Optimized preprocessing input_tensor, orig_height, orig_width = _preprocess_image_optimized(image) # Run inference (optimized session handles the rest) output = self.session.run([self.output_name], {self.input_name: input_tensor})[0] # Optimized output parsing detections = self._parse_yolo_output_optimized(output, orig_height, orig_width) return detections def _parse_yolo_output_optimized(self, output: np.ndarray, orig_height: int, orig_width: int) -> Dict: """Optimized YOLO output parsing for maximum performance""" # Output shape: [1, 84, 8400] -> transpose to [8400, 84] output = output[0].transpose(1, 0) num_classes = len(self.classes) x_factor = orig_width / 640.0 y_factor = orig_height / 640.0 # Vectorized operations for better performance class_scores = output[:, 4:4 + num_classes] max_confidences = np.max(class_scores, axis=1) # Filter by confidence threshold (vectorized) valid_indices = max_confidences > self.confidence if not np.any(valid_indices): return {'boxes': [], 'confidences': [], 'classes': [], 'class_ids': []} # Extract valid detections valid_output = output[valid_indices] valid_confidences = max_confidences[valid_indices] valid_class_ids = np.argmax(class_scores[valid_indices], axis=1) # Convert bounding boxes (vectorized) cx = valid_output[:, 0] * x_factor cy = valid_output[:, 1] * y_factor w = valid_output[:, 2] * x_factor h = valid_output[:, 3] * y_factor x1 = cx - w / 2 y1 = cy - h / 2 x2 = cx + w / 2 y2 = cy + h / 2 # Prepare for NMS boxes_for_nms = np.column_stack([x1, y1, w, h]) # Apply NMS if len(boxes_for_nms) > 0: indices = cv2.dnn.NMSBoxes( boxes_for_nms.tolist(), valid_confidences.tolist(), self.confidence, self.iou_threshold ) if len(indices) > 0: indices = indices.flatten() # Final results final_boxes = [[int(x1[i]), int(y1[i]), int(x2[i]), int(y2[i])] for i in indices] final_confs = [float(valid_confidences[i]) for i in indices] final_class_ids = [int(valid_class_ids[i]) for i in indices] final_classes = [ self.classes[cls_id] if cls_id < num_classes else f"class_{cls_id}" for cls_id in final_class_ids ] return { 'boxes': final_boxes, 'confidences': final_confs, 'classes': final_classes, 'class_ids': final_class_ids } return {'boxes': [], 'confidences': [], 'classes': [], 'class_ids': []} def detect_batch(self, images: List[np.ndarray]) -> List[Dict]: """Optimized batch detection""" if self.model_path.endswith('.onnx'): return self._detect_batch_onnx_optimized(images) else: return [self.detect(img) for img in images] def _detect_batch_onnx_optimized(self, images: List[np.ndarray]) -> List[Dict]: """Batch processing for ONNX with memory optimization""" results = [] # Process images in optimal batch sizes to balance memory and performance batch_size = min(4, len(images)) # Limit batch size for memory efficiency for i in range(0, len(images), batch_size): batch_images = images[i:i + batch_size] batch_results = [] for img in batch_images: batch_results.append(self.detect(img)) results.extend(batch_results) return results def get_performance_info(self) -> Dict: """Get current performance configuration info""" info = { "model_path": self.model_path, "model_type": "ONNX" if self.model_path.endswith('.onnx') else "PyTorch", "onnx_version": ort.__version__ if hasattr(self, 'session') else None, "confidence_threshold": self.confidence, "iou_threshold": self.iou_threshold, "classes": self.classes } if hasattr(self, 'session'): info.update({ "providers": self.session.get_providers(), "optimization_level": "ORT_ENABLE_ALL", "memory_optimizations": "Enabled", "threading_optimizations": "Enabled", "dynamic_cost_model": "Enabled" }) return info