yolocar / main.py
minh9972t12's picture
Update main.py
e83afa6
raw
history blame
27.6 kB
import io
from typing import List
import uvicorn
import numpy as np
import uuid
from datetime import datetime
from fastapi import FastAPI, UploadFile, File, HTTPException, Form
from fastapi.responses import JSONResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from PIL import Image
import cv2
import yaml
from src.detection import YOLOv11Detector
from src.comparison import DamageComparator
from src.visualization import DamageVisualizer
from pathlib import Path
app = FastAPI(
title="Car Damage Detection API",
description="YOLOv11-based car damage detection and comparison system with PyTorch and ONNX support (Optimized for ONNX Runtime v1.19 + opset 21)",
version="1.2.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize components
detector = None
comparator = DamageComparator()
visualizer = DamageVisualizer()
# Model paths mapping - PT and ONNX versions
MODEL_PATHS = {
# PT models (original)
0: "models_small/best.pt", # Small v1 PT
1: "models_small_version_2/best.pt", # Small v2 PT
2: "models_medium/best.pt", # Medium v1 PT
3: "models_medium_version_2/best.pt", # Medium v2 PT
# ONNX models (optimized with v1.19 + opset 21)
4: "models_small/best.onnx", # Small v1 ONNX
5: "models_small_version_2/best.onnx", # Small v2 ONNX
6: "models_medium/best.onnx", # Medium v1 ONNX
7: "models_medium_version_2/best.onnx" # Medium v2 ONNX
}
# Config paths - ONNX uses same config as PT version
CONFIG_PATHS = {
0: "config.yaml", # Small v1 PT
1: "config_version2.yaml", # Small v2 PT
2: "config.yaml", # Medium v1 PT
3: "config_version2.yaml", # Medium v2 PT
4: "config.yaml", # Small v1 ONNX
5: "config_version2.yaml", # Small v2 ONNX
6: "config.yaml", # Medium v1 ONNX
7: "config_version2.yaml" # Medium v2 ONNX
}
# Mapping from PT index to ONNX index
PT_TO_ONNX_MAPPING = {
0: 4, # Small v1 -> ONNX
1: 5, # Small v2 -> ONNX
2: 6, # Medium v1 -> ONNX
3: 7, # Medium v2 -> ONNX
4: None # Large has no ONNX
}
def get_optimal_model_index(select_models: int, prefer_onnx: bool = True) -> int:
"""
Enhanced model selection with performance optimization info
"""
# If user explicitly selects ONNX index (5..8) => use that ONNX with optimizations
if select_models in (4, 5, 6, 7):
onnx_path = Path(MODEL_PATHS.get(select_models, ""))
if not onnx_path.exists():
raise FileNotFoundError(
f"Requested ONNX model index {select_models} not found at {MODEL_PATHS.get(select_models)}")
print(f"πŸš€ Selected ONNX model with MAXIMUM optimizations: {MODEL_PATHS[select_models]}")
return select_models
# Normalize to valid PT indices
if select_models not in (0, 1, 2, 3):
select_models = 2 # default to medium v1
# PT preferred for 0..4
pt_path = Path(MODEL_PATHS.get(select_models, ""))
if pt_path.exists():
print(f"πŸ“¦ Selected PyTorch model: {MODEL_PATHS[select_models]}")
return select_models
# If PT not found and prefer_onnx: fallback to ONNX with optimizations
onnx_index = PT_TO_ONNX_MAPPING.get(select_models)
if prefer_onnx and onnx_index is not None:
onnx_path = Path(MODEL_PATHS.get(onnx_index, ""))
if onnx_path.exists():
print(f"PT not found at {pt_path}, falling back to optimized ONNX {MODEL_PATHS[onnx_index]}")
return onnx_index
# No suitable file found
raise FileNotFoundError(f"Requested PT model index {select_models} not found at {MODEL_PATHS.get(select_models)}")
def load_detector(select_models: int = 2, prefer_onnx: bool = True):
"""
Args:
select_models: Model selection
prefer_onnx: Whether to prefer ONNX format for fallback
"""
global detector, comparator, visualizer
actual_model_index = get_optimal_model_index(select_models, prefer_onnx)
# Get appropriate config file
config_file = CONFIG_PATHS.get(actual_model_index, "config.yaml")
# Load config
with open(config_file, 'r') as f:
config = yaml.safe_load(f)
# Update config with selected model path
config['model']['path'] = MODEL_PATHS[actual_model_index]
# Save updated config to temp file
temp_config = f'temp_config_{actual_model_index}.yaml'
with open(temp_config, 'w') as f:
yaml.dump(config, f, default_flow_style=False)
# Reload all components with new config
detector = YOLOv11Detector(config_path=temp_config)
comparator = DamageComparator(config_path=temp_config)
visualizer = DamageVisualizer(config_path=temp_config)
# Log model info with optimization status
model_type = "ONNX" if MODEL_PATHS[actual_model_index].endswith('.onnx') else "PyTorch"
model_labels = [
"Small v1", "Small v2", "Medium v1", "Medium v2",
"Small v1 ONNX", "Small v2 ONNX", "Medium v1 ONNX", "Medium v2 ONNX"
]
if 0 <= select_models < len(model_labels):
model_size = model_labels[select_models]
else:
raise ValueError(f"select_models={select_models} must be 0-7")
# Enhanced logging for optimization status
optimization_status = "πŸš€ MAXIMUM OPTIMIZATIONS" if model_type == "ONNX" else "πŸ“¦ Standard PyTorch"
print(f"Loaded {model_size} model in {model_type} format - {optimization_status}")
# Show performance info for ONNX models
if model_type == "ONNX" and hasattr(detector, 'get_performance_info'):
perf_info = detector.get_performance_info()
if 'providers' in perf_info:
print(f"Provider: {perf_info['providers'][0]}")
if 'optimization_level' in perf_info:
print(f"Graph optimizations: {perf_info['optimization_level']}")
return detector
# Initialize default detector with medium model (preferring ONNX for performance)
print("πŸš€ Initializing API with optimized ONNX Runtime support...")
detector = load_detector(2, prefer_onnx=True)
comparator = DamageComparator(config_path=CONFIG_PATHS[2])
visualizer = DamageVisualizer(config_path=CONFIG_PATHS[2])
# Create necessary directories
UPLOADS_DIR = Path("uploads")
RESULTS_DIR = Path("results")
UPLOADS_DIR.mkdir(exist_ok=True)
RESULTS_DIR.mkdir(exist_ok=True)
# Mount static files directory
app.mount("/uploads", StaticFiles(directory="uploads"), name="uploads")
@app.get("/")
async def root():
"""Root endpoint with enhanced model info"""
return {
"message": "Car Damage Detection API with YOLOv11 (ONNX Runtime v1.19 optimized)",
"version": "1.2.0",
"optimizations": {
"onnx_runtime": "v1.19+ with opset 21 support",
"performance_features": [
"Graph optimizations (ALL level)",
"Dynamic thread pool with load balancing",
"Memory arena optimizations",
"CPU spinning for low latency",
"OpenMP with ACTIVE wait policy"
]
},
"model_options": {
"0": "Small model v1 (PyTorch)",
"1": "Small model v2 (PyTorch)",
"2": "Medium model v1 (PyTorch)",
"3": "Medium model v2 (PyTorch)",
"4": "Large model (PyTorch only)",
"5": "Small model v1 (ONNX - OPTIMIZED)",
"6": "Small model v2 (ONNX - OPTIMIZED)",
"7": "Medium model v1 (ONNX - OPTIMIZED)",
"8": "Medium model v2 (ONNX - OPTIMIZED)"
},
"recommendation": "Use indices 5-8 for maximum performance with ONNX optimizations",
"endpoints": {
"/docs": "API documentation",
"/detect": "Single/Multi image detection",
"/compare": "Compare before/after images (6 pairs)",
"/uploads/{filename}": "Access saved visualization images",
"/health": "Health check",
"/model-info": "Get current model information",
"/performance-info": "Get optimization details"
}
}
@app.get("/health")
async def health_check():
"""Enhanced health check with optimization status"""
health_info = {
"status": "healthy",
"model": "YOLOv11",
"backend": "ONNX/PyTorch"
}
if detector and hasattr(detector, 'get_performance_info'):
perf_info = detector.get_performance_info()
health_info.update({
"model_type": perf_info.get("model_type", "Unknown"),
"optimization_status": "Optimized" if perf_info.get("model_type") == "ONNX" else "Standard"
})
return health_info
@app.get("/model-info")
async def get_model_info():
"""Get comprehensive information about currently loaded model"""
if detector is None:
return {"error": "No model loaded"}
model_path = detector.model_path
model_type = "ONNX" if model_path.endswith('.onnx') else "PyTorch"
info = {
"model_path": model_path,
"model_type": model_type,
"confidence_threshold": detector.confidence,
"iou_threshold": detector.iou_threshold,
"classes": detector.classes,
"optimization_status": "Optimized" if model_type == "ONNX" else "Standard"
}
# Add detailed performance info for ONNX models
if hasattr(detector, 'get_performance_info'):
perf_info = detector.get_performance_info()
info.update(perf_info)
return info
@app.get("/performance-info")
async def get_performance_info():
"""Get detailed optimization and performance information"""
if detector is None:
return {"error": "No model loaded"}
if hasattr(detector, 'get_performance_info'):
return detector.get_performance_info()
else:
return {
"model_type": "PyTorch",
"optimization_level": "Standard",
"note": "Performance optimizations available for ONNX models only"
}
@app.post("/detect")
async def detect_single_image(
file: UploadFile = File(None),
files: List[UploadFile] = File(None),
select_models: int = Form(2),
prefer_onnx: bool = Form(True)
):
"""
Multi-view detection with ONNX Runtime optimizations
Args:
file: Single image (backward compatibility)
files: Multiple images for multi-view detection
select_models: Model selection
- 0-4: PyTorch models (standard performance)
- 5-8: ONNX models (maximum optimizations)
prefer_onnx: Whether to prefer ONNX format (default: True for better performance)
"""
try:
# Validate select_models
if select_models not in list(range(0, 8)):
raise HTTPException(status_code=400,
detail="select_models must be 0-8 (0-4=PyTorch, 5-8=ONNX optimized)")
# Load appropriate detector
current_detector = load_detector(select_models, prefer_onnx)
# Case 1: Single image (backward compatibility)
if file is not None:
contents = await file.read()
image = Image.open(io.BytesIO(contents)).convert("RGB")
image_np = np.array(image)
image_bgr = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
# Perform detection
detections = current_detector.detect(image_bgr)
# Create visualization
visualized = visualizer.draw_detections(image_bgr, detections, 'new_damage')
# Save and return
filename = f"detection_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}.jpg"
output_path = UPLOADS_DIR / filename
cv2.imwrite(str(output_path), visualized)
# Enhanced response with optimization info
model_type = "ONNX" if current_detector.model_path.endswith('.onnx') else "PyTorch"
optimization_status = "πŸš€ OPTIMIZED" if model_type == "ONNX" else "πŸ“¦ Standard"
return JSONResponse({
"status": "success",
"model_type": model_type,
"optimization_status": optimization_status,
"detections": detections,
"statistics": {
"total_damages": len(detections['boxes']),
"damage_types": list(set(detections['classes']))
},
"visualized_image_path": f"uploads/{filename}",
"visualized_image_url": f"http://localhost:8000/uploads/{filename}",
"filename": filename,
"performance_note": "Using ONNX optimizations" if model_type == "ONNX" else "Consider using ONNX models (5-8) for better performance"
})
# Case 2: Multiple images - MULTI-VIEW DETECTION with ReID
elif files is not None and len(files) > 0:
print(f"\nMulti-view detection with {len(files)} images")
images_list = []
detections_list = []
# Process all images
for idx, img_file in enumerate(files):
contents = await img_file.read()
image = Image.open(io.BytesIO(contents)).convert("RGB")
image_np = np.array(image)
image_bgr = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
images_list.append(image_bgr)
detections = current_detector.detect(image_bgr)
detections_list.append(detections)
print(f" View {idx + 1}: {len(detections['boxes'])} detections")
# DEDUPLICATION using ReID
print("\nPerforming cross-view deduplication...")
unique_damages = comparator.deduplicate_detections_across_views(
detections_list, images_list
)
# Create combined visualization
combined_height = max(img.shape[0] for img in images_list)
combined_width = sum(img.shape[1] for img in images_list)
combined_img = np.ones((combined_height, combined_width, 3), dtype=np.uint8) * 255
x_offset = 0
for img_idx, (image, detections) in enumerate(zip(images_list, detections_list)):
# Resize if needed
h, w = image.shape[:2]
if h != combined_height:
scale = combined_height / h
new_w = int(w * scale)
image = cv2.resize(image, (new_w, combined_height))
w = new_w
# Draw on combined image
combined_img[:, x_offset:x_offset + w] = image
# Draw detections with unique IDs
for det_idx, bbox in enumerate(detections['boxes']):
# Find unique damage ID for this detection
damage_id = None
for uid, damage_info in unique_damages.items():
for d in damage_info['detections']:
if d['view_idx'] == img_idx and d['bbox'] == bbox:
damage_id = uid
break
# Draw with unique ID
x1, y1, x2, y2 = bbox
x1 += x_offset
x2 += x_offset
# Color based on unique ID
if damage_id:
color_hash = int(damage_id[-6:], 16)
color = ((color_hash >> 16) & 255, (color_hash >> 8) & 255, color_hash & 255)
else:
color = (0, 0, 255)
cv2.rectangle(combined_img, (x1, y1), (x2, y2), color, 2)
# Label
label = f"{damage_id[:8] if damage_id else 'Unknown'}"
cv2.putText(combined_img, label, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
x_offset += w
# Save combined visualization
filename = f"multiview_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}.jpg"
output_path = UPLOADS_DIR / filename
cv2.imwrite(str(output_path), combined_img)
# Return results with optimization info
total_detections = sum(len(d['boxes']) for d in detections_list)
model_type = "ONNX" if current_detector.model_path.endswith('.onnx') else "PyTorch"
optimization_status = "πŸš€ OPTIMIZED" if model_type == "ONNX" else "πŸ“¦ Standard"
return JSONResponse({
"status": "success",
"mode": "multi_view_with_reid",
"model_type": model_type,
"optimization_status": optimization_status,
"total_detections_all_views": total_detections,
"unique_damages_count": len(unique_damages),
"unique_damages": {
damage_id: {
"appears_in_views": info['views'],
"class": info['class'],
"avg_confidence": float(info['avg_confidence']),
"detection_count": len(info['detections'])
}
for damage_id, info in unique_damages.items()
},
"reduction_rate": f"{(1 - len(unique_damages) / total_detections) * 100:.1f}%" if total_detections > 0 else "0%",
"visualized_image_path": f"uploads/{filename}",
"visualized_image_url": f"http://localhost:8000/uploads/{filename}",
"message": f"Detected {total_detections} damages across {len(files)} views, "
f"identified {len(unique_damages)} unique damages using ReID",
"performance_note": "Using ONNX optimizations" if model_type == "ONNX" else "Consider using ONNX models (5-8) for better performance"
})
else:
raise HTTPException(status_code=400, detail="No image provided")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Detection failed: {str(e)}")
@app.post("/compare")
async def compare_vehicle_damages(
# Before delivery images (6 positions)
before_1: UploadFile = File(...),
before_2: UploadFile = File(...),
before_3: UploadFile = File(...),
before_4: UploadFile = File(...),
before_5: UploadFile = File(...),
before_6: UploadFile = File(...),
# After delivery images (6 positions)
after_1: UploadFile = File(...),
after_2: UploadFile = File(...),
after_3: UploadFile = File(...),
after_4: UploadFile = File(...),
after_5: UploadFile = File(...),
after_6: UploadFile = File(...),
# Model selection
select_models: int = Form(2),
prefer_onnx: bool = Form(True)
):
"""
Enhanced comparison with ONNX Runtime optimizations and ReID
Args:
before_1-6: Before delivery images from 6 positions
after_1-6: After delivery images from 6 positions
select_models: Model selection (0-4=PyTorch, 5-8=ONNX optimized)
prefer_onnx: Whether to prefer ONNX format (default: True)
"""
try:
# Validate select_models
if select_models not in list(range(0, 8)):
raise HTTPException(status_code=400,
detail="select_models must be 0-8 (0-4=PyTorch, 5-8=ONNX optimized)")
# Load appropriate detector
current_detector = load_detector(select_models, prefer_onnx)
before_images = [before_1, before_2, before_3, before_4, before_5, before_6]
after_images = [after_1, after_2, after_3, after_4, after_5, after_6]
position_results = []
all_visualizations = []
image_pairs = []
# Collect all before/after images and detections
all_before_images = []
all_after_images = []
all_before_detections = []
all_after_detections = []
# Overall statistics
total_new_damages = 0
total_existing_damages = 0
total_matched_damages = 0
session_id = str(uuid.uuid4())[:8]
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
# Process each position pair
for i in range(6):
before_contents = await before_images[i].read()
after_contents = await after_images[i].read()
before_img = Image.open(io.BytesIO(before_contents)).convert("RGB")
after_img = Image.open(io.BytesIO(after_contents)).convert("RGB")
before_np = np.array(before_img)
after_np = np.array(after_img)
before_bgr = cv2.cvtColor(before_np, cv2.COLOR_RGB2BGR)
after_bgr = cv2.cvtColor(after_np, cv2.COLOR_RGB2BGR)
# Store for multi-view analysis
all_before_images.append(before_bgr)
all_after_images.append(after_bgr)
image_pairs.append((before_bgr, after_bgr))
# Detect damages
before_detections = current_detector.detect(before_bgr)
after_detections = current_detector.detect(after_bgr)
all_before_detections.append(before_detections)
all_after_detections.append(after_detections)
# Enhanced comparison with ReID
comparison = comparator.analyze_damage_status(
before_detections, after_detections,
before_bgr, after_bgr
)
# Update statistics
total_new_damages += len(comparison['new_damages'])
total_existing_damages += len(comparison['repaired_damages'])
total_matched_damages += len(comparison['matched_damages'])
# Create visualization
vis_img = visualizer.create_comparison_visualization(
before_bgr, after_bgr,
before_detections, after_detections,
comparison
)
vis_filename = f"comparison_{timestamp_str}_{session_id}_pos{i + 1}.jpg"
vis_path = UPLOADS_DIR / vis_filename
cv2.imwrite(str(vis_path), vis_img)
vis_url = f"http://localhost:8000/uploads/{vis_filename}"
all_visualizations.append(vis_url)
# Store position result with ReID info
position_results.append({
f"position_{i + 1}": {
"case": comparison['case'],
"message": comparison['message'],
"statistics": comparison['statistics'],
"new_damages": comparison['new_damages'],
"matched_damages": comparison['matched_damages'],
"repaired_damages": comparison['repaired_damages'],
"using_reid": comparison['statistics'].get('using_reid', True),
"visualization_path": f"uploads/{vis_filename}",
"visualization_url": vis_url,
"filename": vis_filename
}
})
# Deduplicate BEFORE damages across all 6 views
unique_before = comparator.deduplicate_detections_across_views(
all_before_detections, all_before_images
)
# Deduplicate AFTER damages across all 6 views
unique_after = comparator.deduplicate_detections_across_views(
all_after_detections, all_after_images
)
print(f"Before: {sum(len(d['boxes']) for d in all_before_detections)} detections β†’ {len(unique_before)} unique")
print(f"After: {sum(len(d['boxes']) for d in all_after_detections)} detections β†’ {len(unique_after)} unique")
# Determine overall case with deduplication
actual_new_damages = len(unique_after) - len(unique_before)
overall_case = "CASE_3_SUCCESS"
overall_message = "Successful delivery - No damage detected"
if actual_new_damages > 0:
overall_case = "CASE_2_NEW_DAMAGE"
overall_message = f"Error during delivery - {actual_new_damages} new unique damage(s) detected"
elif len(unique_before) > 0 and actual_new_damages <= 0:
overall_case = "CASE_1_EXISTING"
overall_message = "Existing damages from beginning β†’ Delivery completed"
# Create summary grid
grid_results = [res[f"position_{i + 1}"] for i, res in enumerate(position_results)]
grid_img = visualizer.create_summary_grid(grid_results, image_pairs)
grid_filename = f"summary_grid_{timestamp_str}_{session_id}.jpg"
grid_path = UPLOADS_DIR / grid_filename
cv2.imwrite(str(grid_path), grid_img)
grid_url = f"http://localhost:8000/uploads/{grid_filename}"
timestamp = datetime.now().isoformat()
# Enhanced response with optimization info
model_type = "ONNX" if current_detector.model_path.endswith('.onnx') else "PyTorch"
optimization_status = "πŸš€ OPTIMIZED" if model_type == "ONNX" else "πŸ“¦ Standard"
return JSONResponse({
"status": "success",
"session_id": session_id,
"timestamp": timestamp,
"model_type": model_type,
"optimization_status": optimization_status,
"reid_enabled": True,
"overall_result": {
"case": overall_case,
"message": overall_message,
"statistics": {
"total_new_damages": int(total_new_damages),
"total_matched_damages": int(total_matched_damages),
"total_repaired_damages": int(total_existing_damages),
"unique_damages_before": int(len(unique_before)),
"unique_damages_after": int(len(unique_after)),
"actual_new_unique_damages": int(max(0, len(unique_after) - len(unique_before)))
}
},
"deduplication_info": {
"before_total_detections": int(sum(len(d['boxes']) for d in all_before_detections)),
"before_unique_damages": int(len(unique_before)),
"after_total_detections": int(sum(len(d['boxes']) for d in all_after_detections)),
"after_unique_damages": int(len(unique_after)),
"duplicate_reduction_rate": f"{(1 - len(unique_after) / sum(len(d['boxes']) for d in all_after_detections)) * 100:.1f}%"
if sum(len(d['boxes']) for d in all_after_detections) > 0 else "0%"
},
"position_results": position_results,
"summary_visualization_path": f"uploads/{grid_filename}",
"summary_visualization_url": grid_url,
"all_visualizations": all_visualizations,
"recommendations": {
"action_required": bool(actual_new_damages > 0),
"suggested_action": "Investigate delivery process" if actual_new_damages > 0
else "Proceed with delivery completion"
},
"performance_note": "Using ONNX optimizations" if model_type == "ONNX" else "Consider using ONNX models (5-8) for better performance"
})
except Exception as e:
raise HTTPException(status_code=500, detail=f"Comparison failed: {str(e)}")
if __name__ == "__main__":
import os
uvicorn.run(
"main:app",
host="0.0.0.0",
port=int(os.environ.get("PORT", 7860)),
reload=False,
log_level="info"
)