yolocar / src /detection.py
minh9972t12's picture
Update src/detection.py
225a29d verified
import numpy as np
from typing import List, Dict
import cv2
from pathlib import Path
import torch
import yaml
import onnxruntime as ort
import os
import psutil
def _get_optimal_threads():
"""Calculate optimal thread count for current system"""
physical_cores = psutil.cpu_count(logical=False)
logical_cores = psutil.cpu_count(logical=True)
# Optimal intra-op threads = physical cores
# For high-performance scenarios, use physical cores
intra_threads = physical_cores if physical_cores else 4
print(f"System info: {physical_cores} physical cores, {logical_cores} logical cores")
print(f"Using {intra_threads} intra-op threads for optimal performance")
return intra_threads
def _preprocess_image_optimized(image: np.ndarray) -> np.ndarray:
"""Optimized preprocessing for minimal overhead"""
height, width = image.shape[:2]
# Resize with optimal interpolation
img_resized = cv2.resize(image, (640, 640), interpolation=cv2.INTER_LINEAR)
# RGB conversion (most efficient method)
img_rgb = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB)
# Normalize and transpose in one operation (memory efficient)
img_normalized = img_rgb.astype(np.float32, copy=False) / 255.0
img_transposed = np.transpose(img_normalized, (2, 0, 1))
img_batch = np.expand_dims(img_transposed, axis=0)
return img_batch, height, width
class YOLOv11Detector:
"""YOLOv11 detector optimized for ONNX Runtime v1.19 with opset 21"""
def __init__(self, config_path: str = "config.yaml"):
"""Initialize YOLOv11 detector with maximum ONNX Runtime optimizations"""
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
model_path = self.config['model']['path']
# Check which model file exists
if not Path(model_path).exists():
# Try to find available model files
model_dir = Path("models")
if (model_dir / "best.pt").exists():
model_path = str(model_dir / "best.pt")
print(f"Using best.pt model from training")
elif (model_dir / "last.pt").exists():
model_path = str(model_dir / "last.pt")
print(f"Using last.pt checkpoint model")
elif (model_dir / "best.onnx").exists():
model_path = str(model_dir / "best.onnx")
print(f"Using best.onnx model")
else:
raise FileNotFoundError(f"No model files found in models/ directory!")
self.model_path = model_path
self.device = self.config['model']['device']
self.confidence = self.config['model']['confidence']
self.iou_threshold = self.config['model']['iou_threshold']
self.classes = self.config['detection']['classes']
# Load model based on extension
if self.model_path.endswith('.onnx'):
self._load_onnx_model_optimized() # Will use optimizations for ONNX models
else:
self._load_pytorch_model() # Keep original PyTorch logic
def _load_pytorch_model(self):
"""Load PyTorch model using Ultralytics"""
from ultralytics import YOLO
self.model = YOLO(self.model_path)
# Set model to appropriate device
if self.device == 'cuda:0' and torch.cuda.is_available():
self.model.to('cuda')
else:
self.model.to('cpu')
print(f"Loaded PyTorch model: {self.model_path} on device: {self.device}")
def _load_onnx_model_optimized(self):
"""Load ONNX model with MAXIMUM optimizations for v1.19 + opset 21"""
# Get optimal thread count
intra_threads = _get_optimal_threads()
# Configure MAXIMUM performance session options
sess_options = ort.SessionOptions()
# === GRAPH OPTIMIZATIONS (Level: ALL) ===
# Enable ALL optimizations including layout optimizations
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
# === THREADING OPTIMIZATIONS ===
# Intra-op parallelism (within operators) - use physical cores
sess_options.intra_op_num_threads = intra_threads
# Inter-op parallelism (between operators) - keep at 1 for sequential execution
# Sequential execution often performs better than parallel for single inference
sess_options.inter_op_num_threads = 1
sess_options.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
# === MEMORY OPTIMIZATIONS ===
# Enable memory pattern optimization (reduces memory allocation overhead)
sess_options.enable_mem_pattern = True
# Enable memory arena optimization (better memory reuse)
sess_options.enable_cpu_mem_arena = True
# === CPU PERFORMANCE OPTIMIZATIONS ===
# Allow threads to spin waiting for work (trades CPU for latency)
sess_options.add_session_config_entry("session.intra_op.allow_spinning", "1")
sess_options.add_session_config_entry("session.inter_op.allow_spinning", "1")
# Dynamic cost model for better load balancing (reduces latency variance)
# Best value for dynamic_block_base is 4 according to docs
sess_options.add_session_config_entry("session.intra_op.dynamic_block_base", "4")
# For systems with >64 logical cores, use lock-free queues
logical_cores = psutil.cpu_count(logical=True)
if logical_cores and logical_cores > 64:
sess_options.add_session_config_entry("session.use_lock_free_queue", "1")
print("Enabled lock-free queues for high-core system")
# Disable profiling in production for best performance
sess_options.enable_profiling = False
# === EXECUTION PROVIDER CONFIGURATION ===
providers = []
provider_options = []
if self.device == 'cuda:0' and ort.get_device() == 'GPU':
# CUDA EP with optimizations
cuda_options = {
'device_id': 0,
'arena_extend_strategy': 'kNextPowerOfTwo', # Better memory allocation
'gpu_mem_limit': 2 * 1024 * 1024 * 1024, # 2GB limit
'cudnn_conv_algo_search': 'EXHAUSTIVE', # Find best conv algorithms
'do_copy_in_default_stream': True # Better stream utilization
}
providers.append('CUDAExecutionProvider')
provider_options.append(cuda_options)
print("CUDA EP configured with optimizations")
# CPU EP with OpenMP optimizations (always fallback)
cpu_options = {
'use_arena': True,
'arena_extend_strategy': 'kSameAsRequested'
}
providers.append('CPUExecutionProvider')
provider_options.append(cpu_options)
# === SET OPENMP ENVIRONMENT VARIABLES FOR OPTIMAL CPU PERFORMANCE ===
# These should ideally be set before importing onnxruntime, but we set them anyway
os.environ['OMP_NUM_THREADS'] = str(intra_threads)
os.environ['OMP_WAIT_POLICY'] = 'ACTIVE' # Don't yield CPU, faster inference
os.environ['OMP_MAX_ACTIVE_LEVELS'] = '1' # Tương đương disable nested parallelism (max levels = 1)
# For Intel CPUs: compact affinity for better cache usage
os.environ['KMP_AFFINITY'] = 'granularity=fine,compact,1,0'
print(f"OpenMP configuration: threads={intra_threads}, policy=ACTIVE")
# === CREATE OPTIMIZED SESSION ===
self.session = ort.InferenceSession(
self.model_path,
sess_options=sess_options,
providers=providers,
provider_options=provider_options
)
# Get input/output info
self.input_name = self.session.get_inputs()[0].name
self.output_name = self.session.get_outputs()[0].name
# Verify opset version (should be 21 for latest optimizations)
try:
# This might not always be available, but good to check
model_meta = self.session.get_modelmeta()
print(f"Model metadata - Domain: {getattr(model_meta, 'domain', 'N/A')}")
except:
pass
provider_used = self.session.get_providers()[0]
print(f"✅ ONNX Runtime v{ort.__version__} - Optimized session created")
print(f"📈 Provider: {provider_used}")
print(f"🧵 Threading: {intra_threads} intra-op threads, sequential execution")
print(f"🚀 Optimizations: Graph=ALL, Memory=Enabled, Spinning=Enabled, Dynamic=Enabled")
def detect(self, image: np.ndarray) -> Dict:
"""
Perform detection on image with maximum optimization
Args:
image: Input image as numpy array (BGR format)
Returns:
Dictionary containing detection results
"""
if self.model_path.endswith('.onnx'):
return self._detect_onnx_optimized(image)
else:
return self._detect_pytorch(image)
def _detect_pytorch(self, image: np.ndarray) -> Dict:
"""Detection using PyTorch model"""
from ultralytics import YOLO
results = self.model(
image,
conf=self.confidence,
iou=self.iou_threshold,
device=self.device if 'cuda' in self.device and torch.cuda.is_available() else 'cpu',
verbose=False
)
# Parse results
detections = {
'boxes': [],
'confidences': [],
'classes': [],
'class_ids': []
}
if len(results) > 0 and results[0].boxes is not None:
boxes = results[0].boxes
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
conf = float(box.conf[0].cpu().numpy())
cls_id = int(box.cls[0].cpu().numpy())
cls_name = self.classes[cls_id] if cls_id < len(self.classes) else f"class_{cls_id}"
detections['boxes'].append([int(x1), int(y1), int(x2), int(y2)])
detections['confidences'].append(conf)
detections['classes'].append(cls_name)
detections['class_ids'].append(cls_id)
return detections
def _detect_onnx_optimized(self, image: np.ndarray) -> Dict:
"""Optimized ONNX detection with minimal overhead"""
# Optimized preprocessing
input_tensor, orig_height, orig_width = _preprocess_image_optimized(image)
# Run inference (optimized session handles the rest)
output = self.session.run([self.output_name], {self.input_name: input_tensor})[0]
# Optimized output parsing
detections = self._parse_yolo_output_optimized(output, orig_height, orig_width)
return detections
def _parse_yolo_output_optimized(self, output: np.ndarray, orig_height: int, orig_width: int) -> Dict:
"""Optimized YOLO output parsing for maximum performance"""
# Output shape: [1, 84, 8400] -> transpose to [8400, 84]
output = output[0].transpose(1, 0)
num_classes = len(self.classes)
x_factor = orig_width / 640.0
y_factor = orig_height / 640.0
# Vectorized operations for better performance
class_scores = output[:, 4:4 + num_classes]
max_confidences = np.max(class_scores, axis=1)
# Filter by confidence threshold (vectorized)
valid_indices = max_confidences > self.confidence
if not np.any(valid_indices):
return {'boxes': [], 'confidences': [], 'classes': [], 'class_ids': []}
# Extract valid detections
valid_output = output[valid_indices]
valid_confidences = max_confidences[valid_indices]
valid_class_ids = np.argmax(class_scores[valid_indices], axis=1)
# Convert bounding boxes (vectorized)
cx = valid_output[:, 0] * x_factor
cy = valid_output[:, 1] * y_factor
w = valid_output[:, 2] * x_factor
h = valid_output[:, 3] * y_factor
x1 = cx - w / 2
y1 = cy - h / 2
x2 = cx + w / 2
y2 = cy + h / 2
# Prepare for NMS
boxes_for_nms = np.column_stack([x1, y1, w, h])
# Apply NMS
if len(boxes_for_nms) > 0:
indices = cv2.dnn.NMSBoxes(
boxes_for_nms.tolist(),
valid_confidences.tolist(),
self.confidence,
self.iou_threshold
)
if len(indices) > 0:
indices = indices.flatten()
# Final results
final_boxes = [[int(x1[i]), int(y1[i]), int(x2[i]), int(y2[i])] for i in indices]
final_confs = [float(valid_confidences[i]) for i in indices]
final_class_ids = [int(valid_class_ids[i]) for i in indices]
final_classes = [
self.classes[cls_id] if cls_id < num_classes else f"class_{cls_id}"
for cls_id in final_class_ids
]
return {
'boxes': final_boxes,
'confidences': final_confs,
'classes': final_classes,
'class_ids': final_class_ids
}
return {'boxes': [], 'confidences': [], 'classes': [], 'class_ids': []}
def detect_batch(self, images: List[np.ndarray]) -> List[Dict]:
"""Optimized batch detection"""
if self.model_path.endswith('.onnx'):
return self._detect_batch_onnx_optimized(images)
else:
return [self.detect(img) for img in images]
def _detect_batch_onnx_optimized(self, images: List[np.ndarray]) -> List[Dict]:
"""Batch processing for ONNX with memory optimization"""
results = []
# Process images in optimal batch sizes to balance memory and performance
batch_size = min(4, len(images)) # Limit batch size for memory efficiency
for i in range(0, len(images), batch_size):
batch_images = images[i:i + batch_size]
batch_results = []
for img in batch_images:
batch_results.append(self.detect(img))
results.extend(batch_results)
return results
def get_performance_info(self) -> Dict:
"""Get current performance configuration info"""
info = {
"model_path": self.model_path,
"model_type": "ONNX" if self.model_path.endswith('.onnx') else "PyTorch",
"onnx_version": ort.__version__ if hasattr(self, 'session') else None,
"confidence_threshold": self.confidence,
"iou_threshold": self.iou_threshold,
"classes": self.classes
}
if hasattr(self, 'session'):
info.update({
"providers": self.session.get_providers(),
"optimization_level": "ORT_ENABLE_ALL",
"memory_optimizations": "Enabled",
"threading_optimizations": "Enabled",
"dynamic_cost_model": "Enabled"
})
return info