import cv2 from django import conf import numpy as np from ultralytics import YOLO from insightface.app import FaceAnalysis import torchreid import torch # Configuration DETECTION_THRESHOLD = 0.75 # Confidence threshold for person detection # ============================================================================= # MODEL INITIALIZATION # ============================================================================= # Load YOLOv8 model with ByteTrack tracker for person detection and tracking # YOLOv8 handles object detection while ByteTrack provides consistent tracking IDs model = YOLO(r'detection.pt') # Replace with your trained model path # Initialize InsightFace for facial feature extraction # Uses buffalo_l model which provides high-quality face embeddings face_app = FaceAnalysis(name='buffalo_l', providers=['CUDAExecutionProvider']) face_app.prepare(ctx_id=0) # Prepare for GPU inference # Initialize TorchReID for full-body person re-identification # OSNet is a lightweight but effective model for person ReID reid_extractor = torchreid.utils.FeatureExtractor( model_name='osnet_x0_25', model_path='osnet_x0_25_market1501.pth', # Pre-trained on Market1501 dataset device='cuda' ) # ============================================================================= # GLOBAL VARIABLES FOR PERSON RE-IDENTIFICATION # ============================================================================= # Storage for known person embeddings and their assigned global IDs known_embeddings = [] # List of combined face+body embeddings known_ids = [] # Corresponding global IDs for each embedding next_global_id = 1 # Counter for assigning new global IDs # Mapping from ByteTrack tracker IDs to global person IDs # This helps maintain consistency when tracker IDs change track_to_global = {} # ============================================================================= # VIDEO INPUT/OUTPUT SETUP # ============================================================================= # Initialize video capture and output writer cap = cv2.VideoCapture("demo.mp4") # Input video file width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) # Create output video writer with same properties as input out = cv2.VideoWriter("output.mp4", cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height)) # ============================================================================= # MAIN PROCESSING LOOP # ============================================================================= while True: ret, frame = cap.read() if not ret: break # End of video # Run YOLOv8 detection with ByteTrack tracking # persist=True maintains tracking across frames results = model.track(frame, tracker="bytetrack.yaml", persist=True, verbose=False, conf=DETECTION_THRESHOLD) # Process each detection result for result in results: # Extract bounding boxes in (x1, y1, x2, y2) format boxes = result.boxes.xyxy.cpu().numpy() # Extract tracking IDs if available if result.boxes.id is not None: track_ids = result.boxes.id.int().cpu().tolist() else: # No tracking IDs available, assign None for each detection track_ids = [None] * len(boxes) # Process each detected person for box, track_id in zip(boxes, track_ids): x1, y1, x2, y2 = map(int, box) # Crop the person from the frame person_crop = frame[y1:y2, x1:x2] # Initialize embedding variables face_embedding = None body_embedding = None # ============================================================= # FACE EMBEDDING EXTRACTION # ============================================================= # Extract face embedding using InsightFace faces = face_app.get(person_crop) if faces: # Use the first detected face (most confident) face_embedding = faces[0].embedding # ============================================================= # BODY EMBEDDING EXTRACTION # ============================================================= # Extract body embedding using TorchReID try: # TorchReID expects 128x256 RGB input body_input = cv2.resize(person_crop, (128, 256)) body_input = cv2.cvtColor(body_input, cv2.COLOR_BGR2RGB) # Extract features and convert to numpy body_embedding = reid_extractor(body_input)[0].cpu().numpy() except: # Handle cases where crop is too small or invalid pass # ============================================================= # EMBEDDING COMBINATION AND PERSON MATCHING # ============================================================= # Combine face and body embeddings for robust person representation embedding = None if face_embedding is not None and body_embedding is not None: # Concatenate both embeddings for maximum distinctiveness embedding = np.concatenate((face_embedding, body_embedding)).astype(np.float32) elif face_embedding is not None: # Use only face embedding if body embedding failed embedding = face_embedding.astype(np.float32) elif body_embedding is not None: # Use only body embedding if face detection failed embedding = body_embedding.astype(np.float32) # Assign global ID based on embedding similarity if embedding is not None: match_found = False # Search for similar embeddings among known people if known_embeddings: # Only compare embeddings of the same dimension matching_embeddings = [ (emb, gid) for emb, gid in zip(known_embeddings, known_ids) if emb.shape[0] == embedding.shape[0] ] if matching_embeddings: embs, gids = zip(*matching_embeddings) embs = np.array(embs) # Calculate cosine similarity with all known embeddings sims = np.dot(embs, embedding) / ( np.linalg.norm(embs, axis=1) * np.linalg.norm(embedding) + 1e-6 ) # Find the best match best_match = np.argmax(sims) if sims[best_match] > 0.6: # Similarity threshold global_id = gids[best_match] match_found = True # If no match found, assign new global ID if not match_found: global_id = next_global_id next_global_id += 1 known_embeddings.append(embedding) known_ids.append(global_id) # Update tracker ID to global ID mapping if track_id is not None: track_to_global[track_id] = global_id display_id = global_id else: # No usable embedding available, fallback to tracker ID global_id = track_to_global.get(track_id, f"T{track_id}") display_id = global_id # ============================================================= # VISUALIZATION # ============================================================= # Draw bounding box around detected person cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) # Display the global ID above the bounding box cv2.putText(frame, f"ID {display_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) # ============================================================================= # OUTPUT AND DISPLAY # ============================================================================= # Show the frame with tracking results cv2.imshow("Tracking + ReID", frame) # Break loop if 'q' key is pressed if cv2.waitKey(1) & 0xFF == ord('q'): break # Write frame to output video out.write(frame) # ============================================================================= # CLEANUP # ============================================================================= # Release video capture and writer resources cap.release() out.release() cv2.destroyAllWindows()