yolocar / src /comparison.py
minh9972t12's picture
Update src/comparison.py
7cc748c
raw
history blame
20 kB
import numpy as np
from typing import List, Dict, Tuple, Optional
from scipy.optimize import linear_sum_assignment
import yaml
import cv2
import hashlib
import torch
try:
from transformers import CLIPModel, CLIPProcessor
CLIP_AVAILABLE = True
except ImportError:
print("CLIP not available. Using traditional features only.")
print(" Install with: pip install transformers")
CLIP_AVAILABLE = False
_GLOBAL_CLIP_MODEL = None
_GLOBAL_CLIP_PROCESSOR = None
def get_clip_model():
"""Get or initialize global CLIP model"""
global _GLOBAL_CLIP_MODEL, _GLOBAL_CLIP_PROCESSOR
if _GLOBAL_CLIP_MODEL is None and CLIP_AVAILABLE:
try:
model_name = "openai/clip-vit-base-patch32"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
_GLOBAL_CLIP_MODEL = CLIPModel.from_pretrained(model_name).to(device)
_GLOBAL_CLIP_PROCESSOR = CLIPProcessor.from_pretrained(model_name)
_GLOBAL_CLIP_MODEL.eval()
for param in _GLOBAL_CLIP_MODEL.parameters():
param.requires_grad = False
print(f"✓ CLIP model loaded for ReID: {model_name}")
except Exception as e:
print(f"⚠ CLIP loading failed: {e}. Using fallback features.")
_GLOBAL_CLIP_MODEL = None
_GLOBAL_CLIP_PROCESSOR = None
return _GLOBAL_CLIP_MODEL, _GLOBAL_CLIP_PROCESSOR
class DamageComparator:
"""Enhanced damage comparator with view-invariant re-identification"""
def __init__(self, config_path: str = "config.yaml"):
"""Initialize comparator with configuration"""
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
self.iou_threshold = self.config['comparison']['iou_match_threshold']
self.position_tolerance = self.config['comparison']['position_tolerance']
# Device selection
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Get global CLIP model instead of creating new one
self.clip_model, self.clip_processor = get_clip_model()
# ReID thresholds
self.reid_similarity_threshold = 0.6
self.feature_cache = {}
def calculate_iou(self, box1: List[int], box2: List[int]) -> float:
"""Calculate Intersection over Union between two boxes"""
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
if x2 < x1 or y2 < y1:
return 0.0
intersection = (x2 - x1) * (y2 - y1)
box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
union = box1_area + box2_area - intersection
if union == 0:
return 0.0
return intersection / union
def extract_damage_features(self, image: np.ndarray, bbox: List[int]) -> np.ndarray:
"""
Extract view-invariant features for damage ReID
Args:
image: Full image
bbox: [x1, y1, x2, y2] bounding box
Returns:
Feature vector for ReID
"""
x1, y1, x2, y2 = bbox
# Ensure valid bbox
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(image.shape[1], x2), min(image.shape[0], y2)
damage_roi = image[y1:y2, x1:x2]
if damage_roi.size == 0:
return np.zeros(256) # Return zero vector for invalid ROI
features_list = []
# 1. CLIP features (if available) - Most powerful for ReID
if self.clip_model is not None:
clip_features = self._extract_clip_features(damage_roi)
features_list.append(clip_features)
# 2. Geometric invariant features (always available)
geometric_features = self._extract_geometric_features(damage_roi)
features_list.append(geometric_features)
# 3. Texture features
texture_features = self._extract_texture_features(damage_roi)
features_list.append(texture_features)
# 4. Context features (position on car)
context_features = self._extract_context_features(image, bbox)
features_list.append(context_features)
# Concatenate and normalize
combined_features = np.concatenate(features_list, axis=0)
# L2 normalization for cosine similarity
norm = np.linalg.norm(combined_features)
if norm > 0:
combined_features = combined_features / norm
return combined_features
def _extract_clip_features(self, roi: np.ndarray) -> np.ndarray:
"""Extract CLIP vision features"""
try:
# Convert BGR to RGB
roi_rgb = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)
# Process with CLIP
inputs = self.clip_processor(images=roi_rgb, return_tensors="pt", use_fast=True)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
with torch.no_grad():
image_features = self.clip_model.get_image_features(**inputs)
features = image_features.cpu().numpy().flatten()
# Reduce dimensionality
return features[:128] # Take first 128 dimensions
except Exception as e:
return np.zeros(128)
def _extract_geometric_features(self, roi: np.ndarray) -> np.ndarray:
"""Extract geometric invariant features (Hu moments)"""
features = []
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
# Hu moments - invariant to rotation, scale, translation
try:
moments = cv2.moments(gray)
hu_moments = cv2.HuMoments(moments).flatten()
# Log transform for stability
hu_moments = -np.sign(hu_moments) * np.log10(np.abs(hu_moments) + 1e-10)
features.extend(hu_moments[:7])
except:
features.extend([0] * 7)
# Shape features
edges = cv2.Canny(gray, 50, 150)
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
largest_contour = max(contours, key=cv2.contourArea)
area = cv2.contourArea(largest_contour)
perimeter = cv2.arcLength(largest_contour, True)
if perimeter > 0:
circularity = 4 * np.pi * area / (perimeter ** 2)
features.append(circularity)
else:
features.append(0)
# Aspect ratio
x, y, w, h = cv2.boundingRect(largest_contour)
aspect_ratio = w / h if h > 0 else 1
features.append(aspect_ratio)
else:
features.extend([0, 0])
return np.array(features)
def _extract_texture_features(self, roi: np.ndarray) -> np.ndarray:
"""Extract texture features using simplified LBP"""
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
# Resize to fixed size for consistency
gray_resized = cv2.resize(gray, (32, 32))
# Simple texture statistics
features = []
features.append(np.mean(gray_resized))
features.append(np.std(gray_resized))
# Gradient features
dx = cv2.Sobel(gray_resized, cv2.CV_64F, 1, 0, ksize=3)
dy = cv2.Sobel(gray_resized, cv2.CV_64F, 0, 1, ksize=3)
features.append(np.mean(np.abs(dx)))
features.append(np.mean(np.abs(dy)))
features.append(np.std(dx))
features.append(np.std(dy))
return np.array(features)
def _extract_context_features(self, image: np.ndarray, bbox: List[int]) -> np.ndarray:
"""Extract context features (position on car)"""
h, w = image.shape[:2]
x1, y1, x2, y2 = bbox
# Normalized position
cx = (x1 + x2) / 2 / w
cy = (y1 + y2) / 2 / h
width_ratio = (x2 - x1) / w
height_ratio = (y2 - y1) / h
# Position indicators
is_left = cx < 0.33
is_center = 0.33 <= cx <= 0.67
is_right = cx > 0.67
is_top = cy < 0.4
is_middle = 0.4 <= cy <= 0.7
is_bottom = cy > 0.7
features = [
cx, cy, width_ratio, height_ratio,
float(is_left), float(is_center), float(is_right),
float(is_top), float(is_middle), float(is_bottom)
]
return np.array(features)
def match_damages_with_reid(self,
detections1: Dict,
detections2: Dict,
image1: Optional[np.ndarray] = None,
image2: Optional[np.ndarray] = None) -> Dict:
"""
Enhanced damage matching with ReID capability
Args:
detections1, detections2: Detection results
image1, image2: Original images for feature extraction
Returns:
Matching results with ReID
"""
boxes1 = detections1['boxes']
boxes2 = detections2['boxes']
print(f"\n🔍 DEBUG match_damages_with_reid:")
print(f" Boxes1: {len(boxes1)}, Boxes2: {len(boxes2)}")
print(f" Images provided: {image1 is not None and image2 is not None}")
if len(boxes1) == 0 and len(boxes2) == 0:
return {
'matched_pairs': [],
'unmatched_before': [],
'unmatched_after': [],
'iou_matrix': None,
'reid_scores': None
}
if len(boxes1) == 0:
return {
'matched_pairs': [],
'unmatched_before': [],
'unmatched_after': list(range(len(boxes2))),
'iou_matrix': None,
'reid_scores': None
}
if len(boxes2) == 0:
return {
'matched_pairs': [],
'unmatched_before': list(range(len(boxes1))),
'unmatched_after': [],
'iou_matrix': None,
'reid_scores': None
}
# Calculate IoU matrix (traditional matching)
iou_matrix = np.zeros((len(boxes1), len(boxes2)))
for i, box1 in enumerate(boxes1):
for j, box2 in enumerate(boxes2):
iou_matrix[i, j] = self.calculate_iou(box1, box2)
# Calculate ReID similarity matrix if images provided
reid_matrix = None
if image1 is not None and image2 is not None:
reid_matrix = np.zeros((len(boxes1), len(boxes2)))
# Extract features for all boxes
features1 = [self.extract_damage_features(image1, box) for box in boxes1]
features2 = [self.extract_damage_features(image2, box) for box in boxes2]
# Calculate cosine similarity
for i, feat1 in enumerate(features1):
for j, feat2 in enumerate(features2):
reid_matrix[i, j] = np.dot(feat1, feat2) # Already normalized
# Combine IoU and ReID scores
if reid_matrix is not None:
# Weighted combination: IoU (spatial) + ReID (appearance)
# Give more weight to ReID for better cross-view matching
print(f" ReID matrix shape: {reid_matrix.shape}")
print(f" ReID max similarity: {reid_matrix.max():.3f}")
print(f" ReID mean similarity: {reid_matrix.mean():.3f}")
print(f" Threshold: {self.reid_similarity_threshold}")
combined_matrix = 0.3 * iou_matrix + 0.7 * reid_matrix
else:
combined_matrix = iou_matrix
# Hungarian algorithm for optimal matching
cost_matrix = 1 - combined_matrix
row_indices, col_indices = linear_sum_assignment(cost_matrix)
# Filter matches by threshold
matched_pairs = []
matched_rows = set()
matched_cols = set()
# Use different threshold based on whether ReID is available
threshold = self.reid_similarity_threshold if reid_matrix is not None else self.iou_threshold
for i, j in zip(row_indices, col_indices):
score = combined_matrix[i, j]
if score >= threshold:
# Also check class consistency
if detections1['classes'][i] == detections2['classes'][j]:
matched_pairs.append((i, j, score))
matched_rows.add(i)
matched_cols.add(j)
# Find unmatched damages
unmatched_before = [i for i in range(len(boxes1)) if i not in matched_rows]
unmatched_after = [j for j in range(len(boxes2)) if j not in matched_cols]
print(f" IoU matrix max: {iou_matrix.max():.3f}")
print(f" Combined score max: {combined_matrix.max():.3f}")
return {
'matched_pairs': matched_pairs,
'unmatched_before': unmatched_before,
'unmatched_after': unmatched_after,
'iou_matrix': iou_matrix.tolist(),
'reid_scores': reid_matrix.tolist() if reid_matrix is not None else None
}
def match_damages(self, detections1: Dict, detections2: Dict) -> Dict:
"""
Original matching method (backward compatibility)
"""
return self.match_damages_with_reid(detections1, detections2, None, None)
# In src/comparison.py, update the analyze_damage_status method:
def analyze_damage_status(self,
before_detections: Dict,
after_detections: Dict,
before_image: Optional[np.ndarray] = None,
after_image: Optional[np.ndarray] = None) -> Dict:
"""
Enhanced damage analysis with ReID support
"""
# Use enhanced matching with ReID if images provided
matching = self.match_damages_with_reid(
before_detections, after_detections,
before_image, after_image
)
# Extract damage information
matched_damages = []
for i, j, score in matching['matched_pairs']:
matched_damages.append({
'type': before_detections['classes'][i],
'confidence_before': float(before_detections['confidences'][i]), # Convert to Python float
'confidence_after': float(after_detections['confidences'][j]), # Convert to Python float
'box_before': before_detections['boxes'][i],
'box_after': after_detections['boxes'][j],
'matching_score': float(score), # Convert to Python float
'is_same_damage': bool(score > self.reid_similarity_threshold) # Convert to Python bool
})
existing_damages = []
for i in matching['unmatched_before']:
existing_damages.append({
'type': before_detections['classes'][i],
'confidence': float(before_detections['confidences'][i]), # Convert to Python float
'box': before_detections['boxes'][i]
})
new_damages = []
for j in matching['unmatched_after']:
new_damages.append({
'type': after_detections['classes'][j],
'confidence': float(after_detections['confidences'][j]), # Convert to Python float
'box': after_detections['boxes'][j]
})
# Determine case
case = self._determine_case(matched_damages, existing_damages, new_damages)
return {
'case': case['type'],
'message': case['message'],
'matched_damages': matched_damages,
'repaired_damages': existing_damages,
'new_damages': new_damages,
'statistics': {
'total_before': len(before_detections['boxes']),
'total_after': len(after_detections['boxes']),
'matched': len(matched_damages),
'repaired': len(existing_damages),
'new': len(new_damages),
'using_reid': bool(before_image is not None and after_image is not None) # Convert to Python bool
}
}
def _determine_case(self, matched: List, repaired: List, new: List) -> Dict:
"""Determine which case the comparison falls into"""
# Case 3: Happy case - no damages at all
if len(matched) == 0 and len(repaired) == 0 and len(new) == 0:
return {
'type': 'CASE_3_SUCCESS',
'message': 'Successful delivery - No damage detected'
}
# Case 1: Existing damages remain
if len(matched) > 0 and len(new) == 0:
return {
'type': 'CASE_1_EXISTING',
'message': 'Error from the beginning, not during delivery -> Delivery completed'
}
# Case 2: New damages detected
if len(new) > 0:
return {
'type': 'CASE_2_NEW_DAMAGE',
'message': 'Delivery Defect - New Damage Discovered'
}
# Special case: All damages repaired
if len(repaired) > 0 and len(new) == 0 and len(matched) == 0:
return {
'type': 'CASE_REPAIRED',
'message': 'All damage repaired - Vehicle delivered successfully'
}
return {
'type': 'CASE_UNKNOWN',
'message': 'Status Undetermined'
}
def deduplicate_detections_across_views(self,
detections_list: List[Dict],
images_list: List[np.ndarray]) -> Dict:
"""
Deduplicate damages across multiple views of the same car
Args:
detections_list: List of detections from different views
images_list: List of corresponding images
Returns:
Unique damages with their appearances in different views
"""
all_damages = []
# Collect all damages with their features
for view_idx, (detections, image) in enumerate(zip(detections_list, images_list)):
for i, bbox in enumerate(detections['boxes']):
features = self.extract_damage_features(image, bbox)
all_damages.append({
'view_idx': view_idx,
'bbox': bbox,
'class': detections['classes'][i],
'confidence': detections['confidences'][i],
'features': features
})
# Group similar damages
groups = []
used = set()
for i, damage1 in enumerate(all_damages):
if i in used:
continue
group = [damage1]
used.add(i)
for j, damage2 in enumerate(all_damages):
if j in used or damage1['view_idx'] == damage2['view_idx']:
continue
# Calculate similarity
similarity = np.dot(damage1['features'], damage2['features'])
if similarity > self.reid_similarity_threshold:
# Check class consistency
if damage1['class'] == damage2['class']:
group.append(damage2)
used.add(j)
groups.append(group)
# Create unique damage IDs
unique_damages = {}
for group_idx, group in enumerate(groups):
# Generate consistent ID based on features
feature_hash = hashlib.md5(
group[0]['features'].tobytes()
).hexdigest()[:8]
damage_id = f"DMG_{feature_hash}"
unique_damages[damage_id] = {
'views': [d['view_idx'] for d in group],
'class': group[0]['class'],
'avg_confidence': np.mean([d['confidence'] for d in group]),
'detections': group
}
return unique_damages