import numpy as np from typing import List, Dict from scipy.optimize import linear_sum_assignment import yaml class DamageComparator: """Compare damages between before and after images""" def __init__(self, config_path: str = "config.yaml"): """Initialize comparator with configuration""" with open(config_path, 'r') as f: self.config = yaml.safe_load(f) self.iou_threshold = self.config['comparison']['iou_match_threshold'] self.position_tolerance = self.config['comparison']['position_tolerance'] def calculate_iou(self, box1: List[int], box2: List[int]) -> float: """ Calculate Intersection over Union between two boxes Args: box1, box2: Bounding boxes in format [x1, y1, x2, y2] Returns: IoU value between 0 and 1 """ # Calculate intersection area x1 = max(box1[0], box2[0]) y1 = max(box1[1], box2[1]) x2 = min(box1[2], box2[2]) y2 = min(box1[3], box2[3]) if x2 < x1 or y2 < y1: return 0.0 intersection = (x2 - x1) * (y2 - y1) # Calculate union area box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) union = box1_area + box2_area - intersection # Calculate IoU if union == 0: return 0.0 return intersection / union def match_damages(self, detections1: Dict, detections2: Dict) -> Dict: """ Match damages between two sets of detections using Hungarian algorithm Args: detections1: First detection results (before) detections2: Second detection results (after) Returns: Matching results with paired and unpaired damages """ boxes1 = detections1['boxes'] boxes2 = detections2['boxes'] if len(boxes1) == 0 and len(boxes2) == 0: return { 'matched_pairs': [], 'unmatched_before': [], 'unmatched_after': [], 'iou_matrix': None } if len(boxes1) == 0: return { 'matched_pairs': [], 'unmatched_before': [], 'unmatched_after': list(range(len(boxes2))), 'iou_matrix': None } if len(boxes2) == 0: return { 'matched_pairs': [], 'unmatched_before': list(range(len(boxes1))), 'unmatched_after': [], 'iou_matrix': None } # Calculate IoU matrix iou_matrix = np.zeros((len(boxes1), len(boxes2))) for i, box1 in enumerate(boxes1): for j, box2 in enumerate(boxes2): iou_matrix[i, j] = self.calculate_iou(box1, box2) # Use Hungarian algorithm for optimal matching # Convert to cost matrix (1 - IoU) cost_matrix = 1 - iou_matrix row_indices, col_indices = linear_sum_assignment(cost_matrix) # Filter matches by IoU threshold matched_pairs = [] matched_rows = set() matched_cols = set() for i, j in zip(row_indices, col_indices): if iou_matrix[i, j] >= self.iou_threshold: # Also check if damage types match if detections1['classes'][i] == detections2['classes'][j]: matched_pairs.append((i, j, iou_matrix[i, j])) matched_rows.add(i) matched_cols.add(j) # Find unmatched damages unmatched_before = [i for i in range(len(boxes1)) if i not in matched_rows] unmatched_after = [j for j in range(len(boxes2)) if j not in matched_cols] return { 'matched_pairs': matched_pairs, 'unmatched_before': unmatched_before, 'unmatched_after': unmatched_after, 'iou_matrix': iou_matrix.tolist() } def analyze_damage_status(self, before_detections: Dict, after_detections: Dict) -> Dict: """ Analyze damage status between before and after images Returns detailed analysis with case classification """ matching = self.match_damages(before_detections, after_detections) # Extract damage information matched_damages = [] for i, j, iou in matching['matched_pairs']: matched_damages.append({ 'type': before_detections['classes'][i], 'confidence_before': before_detections['confidences'][i], 'confidence_after': after_detections['confidences'][j], 'box_before': before_detections['boxes'][i], 'box_after': after_detections['boxes'][j], 'iou': iou }) existing_damages = [] for i in matching['unmatched_before']: existing_damages.append({ 'type': before_detections['classes'][i], 'confidence': before_detections['confidences'][i], 'box': before_detections['boxes'][i] }) new_damages = [] for j in matching['unmatched_after']: new_damages.append({ 'type': after_detections['classes'][j], 'confidence': after_detections['confidences'][j], 'box': after_detections['boxes'][j] }) # Determine case case = self._determine_case(matched_damages, existing_damages, new_damages) return { 'case': case['type'], 'message': case['message'], 'matched_damages': matched_damages, 'repaired_damages': existing_damages, # Damages that were there before but not after 'new_damages': new_damages, 'statistics': { 'total_before': len(before_detections['boxes']), 'total_after': len(after_detections['boxes']), 'matched': len(matched_damages), 'repaired': len(existing_damages), 'new': len(new_damages) } } def _determine_case(self, matched: List, repaired: List, new: List) -> Dict: """Determine which case the comparison falls into""" # Case 3: Happy case - no damages at all if len(matched) == 0 and len(repaired) == 0 and len(new) == 0: return { 'type': 'CASE_3_SUCCESS', 'message': 'Successful delivery - No damage detected' } # Case 1: Existing damages remain (with or without repairs/new damages) if len(matched) > 0 and len(new) == 0: return { 'type': 'CASE_1_EXISTING', 'message': 'Error from the beginning, not during the delivery process -> Delivery completed' } # Case 2: New damages detected if len(new) > 0: return { 'type': 'CASE_2_NEW_DAMAGE', 'message': 'Delivery Defect - New Damage Discovered' } # Special case: All damages repaired if len(repaired) > 0 and len(new) == 0 and len(matched) == 0: return { 'type': 'CASE_REPAIRED', 'message': 'All damage repaired - Vehicle delivered successfully' } return { 'type': 'CASE_UNKNOWN', 'message': 'Status Undetermined' }