import io from typing import List import uvicorn import numpy as np import uuid from datetime import datetime from pathlib import Path from fastapi import FastAPI, UploadFile, File, HTTPException, Form from fastapi.responses import JSONResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from PIL import Image import cv2 import yaml from src.detection import YOLOv11Detector from src.comparison import DamageComparator from src.visualization import DamageVisualizer app = FastAPI( title="Car Damage Detection API", description="YOLOv11-based car damage detection and comparison system", version="1.0.0" ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize components detector = None comparator = DamageComparator() visualizer = DamageVisualizer() # Model paths mapping MODEL_PATHS = { 0: "models_small/best.pt", 1: "models_medium/best.pt", 2: "models_large/best.pt" } def load_detector(select_models: int = 2): """Load detector with specified model""" global detector # Update config with selected model path with open('config.yaml', 'r') as f: config = yaml.safe_load(f) config['model']['path'] = MODEL_PATHS.get(select_models, MODEL_PATHS[2]) with open('config.yaml', 'w') as f: yaml.dump(config, f, default_flow_style=False) detector = YOLOv11Detector() # Reload detector with new config return detector detector = load_detector(2) # Create necessary directories UPLOADS_DIR = Path("uploads") RESULTS_DIR = Path("results") UPLOADS_DIR.mkdir(exist_ok=True) RESULTS_DIR.mkdir(exist_ok=True) # Mount static files directory app.mount("/uploads", StaticFiles(directory="uploads"), name="uploads") @app.get("/") async def root(): """Root endpoint""" return { "message": "Car Damage Detection API with YOLOv11", "endpoints": { "/docs": "API documentation", "/detect": "Single image detection", "/compare": "Compare before/after images (6 pairs)", "/uploads/{filename}": "Access saved visualization images", "/health": "Health check" } } @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "model": "YOLOv11"} @app.post("/detect") async def detect_single_image( file: UploadFile = File(None), files: List[UploadFile] = File(None), select_models: int = Form(2) ): """ - Multi-view detection với deduplication Args: file: Single image (backward compatibility) select_models: Model selection (0=small, 1=medium, 2=large) """ try: # Validate select_models if select_models not in [0, 1, 2]: raise HTTPException(status_code=400, detail="select_models must be 0, 1, or 2") # Load appropriate detector current_detector = load_detector(select_models) # Case 1: Single image (backward compatibility) if file is not None: contents = await file.read() image = Image.open(io.BytesIO(contents)).convert("RGB") image_np = np.array(image) image_bgr = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) # Perform detection detections = current_detector.detect(image_bgr) # Create visualization visualized = visualizer.draw_detections(image_bgr, detections, 'new_damage') # Save and return filename = f"detection_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}.jpg" output_path = UPLOADS_DIR / filename cv2.imwrite(str(output_path), visualized) return JSONResponse({ "status": "success", "detections": detections, "statistics": { "total_damages": len(detections['boxes']), "damage_types": list(set(detections['classes'])) }, "visualized_image_path": f"uploads/{filename}", "visualized_image_url": f"http://localhost:8000/uploads/{filename}", "filename": filename }) # Case 2: Multiple images - MULTI-VIEW DETECTION với ReID elif files is not None and len(files) > 0: print(f"\nMulti-view detection with {len(files)} images") images_list = [] detections_list = [] # Process all images for idx, img_file in enumerate(files): contents = await img_file.read() image = Image.open(io.BytesIO(contents)).convert("RGB") image_np = np.array(image) image_bgr = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) images_list.append(image_bgr) detections = current_detector.detect(image_bgr) detections_list.append(detections) print(f" View {idx + 1}: {len(detections['boxes'])} detections") # DEDUPLICATION using ReID print("\nPerforming cross-view deduplication...") unique_damages = comparator.deduplicate_detections_across_views( detections_list, images_list ) # Create combined visualization combined_height = max(img.shape[0] for img in images_list) combined_width = sum(img.shape[1] for img in images_list) combined_img = np.ones((combined_height, combined_width, 3), dtype=np.uint8) * 255 x_offset = 0 for img_idx, (image, detections) in enumerate(zip(images_list, detections_list)): # Resize if needed h, w = image.shape[:2] if h != combined_height: scale = combined_height / h new_w = int(w * scale) image = cv2.resize(image, (new_w, combined_height)) w = new_w # Draw on combined image combined_img[:, x_offset:x_offset + w] = image # Draw detections with unique IDs for det_idx, bbox in enumerate(detections['boxes']): # Find unique damage ID for this detection damage_id = None for uid, damage_info in unique_damages.items(): for d in damage_info['detections']: if d['view_idx'] == img_idx and d['bbox'] == bbox: damage_id = uid break # Draw with unique ID x1, y1, x2, y2 = bbox x1 += x_offset x2 += x_offset # Color based on unique ID if damage_id: color_hash = int(damage_id[-6:], 16) color = ((color_hash >> 16) & 255, (color_hash >> 8) & 255, color_hash & 255) else: color = (0, 0, 255) cv2.rectangle(combined_img, (x1, y1), (x2, y2), color, 2) # Label label = f"{damage_id[:8] if damage_id else 'Unknown'}" cv2.putText(combined_img, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) x_offset += w # Save combined visualization filename = f"multiview_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}.jpg" output_path = UPLOADS_DIR / filename cv2.imwrite(str(output_path), combined_img) # Return results total_detections = sum(len(d['boxes']) for d in detections_list) return JSONResponse({ "status": "success", "mode": "multi_view_with_reid", "total_detections_all_views": total_detections, "unique_damages_count": len(unique_damages), "unique_damages": { damage_id: { "appears_in_views": info['views'], "class": info['class'], "avg_confidence": float(info['avg_confidence']), "detection_count": len(info['detections']) } for damage_id, info in unique_damages.items() }, "reduction_rate": f"{(1 - len(unique_damages) / total_detections) * 100:.1f}%" if total_detections > 0 else "0%", "visualized_image_path": f"uploads/{filename}", "visualized_image_url": f"http://localhost:8000/uploads/{filename}", "message": f"Detected {total_detections} damages across {len(files)} views, " f"identified {len(unique_damages)} unique damages using ReID" }) else: raise HTTPException(status_code=400, detail="No image provided") except Exception as e: raise HTTPException(status_code=500, detail=f"Detection failed: {str(e)}") @app.post("/compare") async def compare_vehicle_damages( # Before delivery images (6 positions) before_1: UploadFile = File(...), before_2: UploadFile = File(...), before_3: UploadFile = File(...), before_4: UploadFile = File(...), before_5: UploadFile = File(...), before_6: UploadFile = File(...), # After delivery images (6 positions) after_1: UploadFile = File(...), after_2: UploadFile = File(...), after_3: UploadFile = File(...), after_4: UploadFile = File(...), after_5: UploadFile = File(...), after_6: UploadFile = File(...), # Model selection select_models: int = Form(2), ): """ Enhanced comparison với ReID """ try: # Validate select_models if select_models not in [0, 1, 2]: raise HTTPException(status_code=400, detail="select_models must be 0, 1, or 2") # Load appropriate detector current_detector = load_detector(select_models) before_images = [before_1, before_2, before_3, before_4, before_5, before_6] after_images = [after_1, after_2, after_3, after_4, after_5, after_6] position_results = [] all_visualizations = [] image_pairs = [] # Collect all before/after images and detections all_before_images = [] all_after_images = [] all_before_detections = [] all_after_detections = [] # Overall statistics total_new_damages = 0 total_existing_damages = 0 total_matched_damages = 0 session_id = str(uuid.uuid4())[:8] timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") # Process each position pair for i in range(6): before_contents = await before_images[i].read() after_contents = await after_images[i].read() before_img = Image.open(io.BytesIO(before_contents)).convert("RGB") after_img = Image.open(io.BytesIO(after_contents)).convert("RGB") before_np = np.array(before_img) after_np = np.array(after_img) before_bgr = cv2.cvtColor(before_np, cv2.COLOR_RGB2BGR) after_bgr = cv2.cvtColor(after_np, cv2.COLOR_RGB2BGR) # Store for multi-view analysis all_before_images.append(before_bgr) all_after_images.append(after_bgr) image_pairs.append((before_bgr, after_bgr)) # Detect damages before_detections = current_detector.detect(before_bgr) after_detections = current_detector.detect(after_bgr) all_before_detections.append(before_detections) all_after_detections.append(after_detections) # Enhanced comparison with ReID (pass images for feature extraction) comparison = comparator.analyze_damage_status( before_detections, after_detections, before_bgr, after_bgr ) # Update statistics total_new_damages += len(comparison['new_damages']) total_existing_damages += len(comparison['repaired_damages']) total_matched_damages += len(comparison['matched_damages']) # Create visualization vis_img = visualizer.create_comparison_visualization( before_bgr, after_bgr, before_detections, after_detections, comparison ) vis_filename = f"comparison_{timestamp_str}_{session_id}_pos{i + 1}.jpg" vis_path = UPLOADS_DIR / vis_filename cv2.imwrite(str(vis_path), vis_img) vis_url = f"http://localhost:8000/uploads/{vis_filename}" all_visualizations.append(vis_url) # Store position result with ReID info position_results.append({ f"position_{i + 1}": { "case": comparison['case'], "message": comparison['message'], "statistics": comparison['statistics'], "new_damages": comparison['new_damages'], "matched_damages": comparison['matched_damages'], "repaired_damages": comparison['repaired_damages'], "using_reid": comparison['statistics'].get('using_reid', True), "visualization_path": f"uploads/{vis_filename}", "visualization_url": vis_url, "filename": vis_filename } }) # Deduplicate BEFORE damages across all 6 views unique_before = comparator.deduplicate_detections_across_views( all_before_detections, all_before_images ) # Deduplicate AFTER damages across all 6 views unique_after = comparator.deduplicate_detections_across_views( all_after_detections, all_after_images ) print( f"Before: {sum(len(d['boxes']) for d in all_before_detections)} detections → {len(unique_before)} unique") print(f"After: {sum(len(d['boxes']) for d in all_after_detections)} detections → {len(unique_after)} unique") # Determine overall case with deduplication actual_new_damages = len(unique_after) - len(unique_before) overall_case = "CASE_3_SUCCESS" overall_message = "Successful delivery - No damage detected" if actual_new_damages > 0: overall_case = "CASE_2_NEW_DAMAGE" overall_message = f"Error during delivery - {actual_new_damages} new unique damage(s) detected" elif len(unique_before) > 0 and actual_new_damages <= 0: overall_case = "CASE_1_EXISTING" overall_message = "Existing damages from beginning -> Delivery completed" # Create summary grid grid_results = [res[f"position_{i + 1}"] for i, res in enumerate(position_results)] grid_img = visualizer.create_summary_grid(grid_results, image_pairs) grid_filename = f"summary_grid_{timestamp_str}_{session_id}.jpg" grid_path = UPLOADS_DIR / grid_filename cv2.imwrite(str(grid_path), grid_img) grid_url = f"http://localhost:8000/uploads/{grid_filename}" timestamp = datetime.now().isoformat() return JSONResponse({ "status": "success", "session_id": session_id, "timestamp": timestamp, "reid_enabled": True, "overall_result": { "case": overall_case, "message": overall_message, "statistics": { "total_new_damages": int(total_new_damages), "total_matched_damages": int(total_matched_damages), "total_repaired_damages": int(total_existing_damages), "unique_damages_before": int(len(unique_before)), "unique_damages_after": int(len(unique_after)), "actual_new_unique_damages": int(max(0, len(unique_after) - len(unique_before))) } }, "deduplication_info": { "before_total_detections": int(sum(len(d['boxes']) for d in all_before_detections)), "before_unique_damages": int(len(unique_before)), "after_total_detections": int(sum(len(d['boxes']) for d in all_after_detections)), "after_unique_damages": int(len(unique_after)), "duplicate_reduction_rate": f"{(1 - len(unique_after) / sum(len(d['boxes']) for d in all_after_detections)) * 100:.1f}%" if sum(len(d['boxes']) for d in all_after_detections) > 0 else "0%" }, "position_results": position_results, "summary_visualization_path": f"uploads/{grid_filename}", "summary_visualization_url": grid_url, "all_visualizations": all_visualizations, "recommendations": { "action_required": bool(actual_new_damages > 0), "suggested_action": "Investigate delivery process" if actual_new_damages > 0 else "Proceed with delivery completion" } }) except Exception as e: raise HTTPException(status_code=500, detail=f"Comparison failed: {str(e)}") if __name__ == "__main__": import os uvicorn.run( "main:app", host="0.0.0.0", port=int(os.environ.get("PORT", 7860)), reload=False, log_level="info" )