import supervision as sv from ultralytics import YOLO import cv2 import numpy as np from fastapi import FastAPI, File, UploadFile from fastapi.responses import JSONResponse, Response import uvicorn import logging from datetime import datetime import os import time from collections import defaultdict # Ensure the logs directory exists if not os.path.exists("logs"): os.makedirs("logs") app = FastAPI() # Load the exported ONNX model onnx_model = YOLO("models/best-data-v5.onnx", task="detect") # Define the logging configuration LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": { "default": { "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", }, "access": { "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", }, }, "handlers": { "default": { "formatter": "default", "class": "logging.StreamHandler", "stream": "ext://sys.stdout", }, "file": { "formatter": "default", "class": "logging.FileHandler", "filename": f"logs/{datetime.now().strftime('%Y-%m-%d')}.log", "mode": "a", }, "access": { "formatter": "access", "class": "logging.StreamHandler", "stream": "ext://sys.stdout", }, }, "loggers": { "": { "handlers": ["default", "file"], "level": "INFO", }, "uvicorn.access": { "handlers": ["access", "file"], "level": "INFO", "propagate": False, }, "ultralytics": { "handlers": ["default", "file"], "level": "INFO", "propagate": False, }, } } # Apply the logging configuration logging.config.dictConfig(LOGGING_CONFIG) # def parse_detection(detections): # parsed_rows = [] # for i in range(len(detections.xyxy)): # x_min = float(detections.xyxy[i][0]) # y_min = float(detections.xyxy[i][1]) # x_max = float(detections.xyxy[i][2]) # y_max = float(detections.xyxy[i][3]) # width = int(x_max - x_min) # height = int(y_max - y_min) # row = { # "top": int(y_min), # "left": int(x_min), # "width": width, # "height": height, # "class_id": "" # if detections.class_id is None # else int(detections.class_id[i]), # "confidence": "" # if detections.confidence is None # else float(detections.confidence[i]), # "tracker_id": "" # if detections.tracker_id is None # else int(detections.tracker_id[i]), # } # if hasattr(detections, "data"): # for key, value in detections.data.items(): # row[key] = ( # str(value[i]) # if hasattr(value, "__getitem__") and value.ndim != 0 # else str(value) # ) # parsed_rows.append(row) # return parsed_rows # # Run inference # def callback(image_slice: np.ndarray) -> sv.Detections: # # logging.info("Running callback for image slice") # results = onnx_model(image_slice)[0] # return sv.Detections.from_ultralytics(results) # def infer(image): # start_time = time.time() # image_arr = np.frombuffer(image, np.uint8) # image = cv2.imdecode(image_arr, cv2.IMREAD_COLOR) # image = cv2.resize(image, (1920, 1920)) # results = onnx_model(image)[0] # # detections = sv.Detections.from_ultralytics(results) # slicer = sv.InferenceSlicer(callback=callback, slice_wh=(640, 640)) # detections = slicer(image=image) # logging.info("Completed slicing and detection") # parsed_rows = parse_detection(detections) # # Count the occurrences of each class # class_counts = defaultdict(int) # for detection in parsed_rows: # class_name = detection.get("class_name", "Unknown") # class_counts[class_name] += 1 # summary_info = ", ".join( # [f"{count} {class_name}" for class_name, count in class_counts.items()] # ) # logging.info(f"Summary info: {summary_info}") # logging.info(f"Run time: {time.time() - start_time:.2f} seconds") # # label_annotator = sv.LabelAnnotator(text_color=sv.Color.BLACK) # bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=4) # annotated_image = image.copy() # annotated_image = bounding_box_annotator.annotate(scene=annotated_image, detections=detections) # # annotated_image = label_annotator.annotate(scene=annotated_image, detections=detections) # # logging.info("Annotated image") # return annotated_image, parsed_rows def parse_detection(detections, scale_x, scale_y): parsed_rows = [] for i in range(len(detections.xyxy)): # Rescale the coordinates to match the original image size x_min = float(detections.xyxy[i][0]) / scale_x y_min = float(detections.xyxy[i][1]) / scale_y x_max = float(detections.xyxy[i][2]) / scale_x y_max = float(detections.xyxy[i][3]) / scale_y width = int(x_max - x_min) height = int(y_max - y_min) row = { "top": int(y_min), "left": int(x_min), "width": width, "height": height, "class_id": "" if detections.class_id is None else int(detections.class_id[i]), "confidence": "" if detections.confidence is None else float(detections.confidence[i]), "tracker_id": "" if detections.tracker_id is None else int(detections.tracker_id[i]), } if hasattr(detections, "data"): for key, value in detections.data.items(): row[key] = ( str(value[i]) if hasattr(value, "__getitem__") and value.ndim != 0 else str(value) ) parsed_rows.append(row) return parsed_rows # Run inference def callback(image_slice: np.ndarray) -> sv.Detections: # logging.info("Running callback for image slice") results = onnx_model(image_slice)[0] return sv.Detections.from_ultralytics(results) def infer(image): start_time = time.time() image_arr = np.frombuffer(image, np.uint8) image = cv2.imdecode(image_arr, cv2.IMREAD_COLOR) # Get original dimensions original_height, original_width = image.shape[:2] # Resize image for detection target_size = 1920 image = cv2.resize(image, (target_size, target_size)) # Compute scale factors scale_x = target_size / original_width scale_y = target_size / original_height # Run model results = onnx_model(image)[0] # Using slicer for detection slicer = sv.InferenceSlicer(callback=callback, slice_wh=(640, 640)) detections = slicer(image=image) logging.info("Completed slicing and detection") # Parse detections and adjust coordinates to original size parsed_rows = parse_detection(detections, scale_x, scale_y) # Count the occurrences of each class class_counts = defaultdict(int) for detection in parsed_rows: class_name = detection.get("class_name", "Unknown") class_counts[class_name] += 1 summary_info = ", ".join( [f"{count} {class_name}" for class_name, count in class_counts.items()] ) logging.info(f"Summary info: {summary_info}") logging.info(f"Run time: {time.time() - start_time:.2f} seconds") # Annotate the resized image bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=4) annotated_image = image.copy() annotated_image = bounding_box_annotator.annotate(scene=annotated_image, detections=detections) # Resize the annotated image back to original dimensions annotated_image = cv2.resize(annotated_image, (original_width, original_height)) # Return the resized annotated image and parsed detection results return annotated_image, parsed_rows @app.post("/process-image/") async def process_image(image: UploadFile = File(...), draw_boxes: bool = False): filename = image.filename logging.info(f"Received process-image request for file: {filename}") image_data = await image.read() annotated_image, results = infer(image_data) if draw_boxes: _, img_encoded = cv2.imencode('.jpg', annotated_image) logging.info("Returning annotated image") return Response(content=img_encoded.tobytes(), media_type="image/jpeg") logging.info("Returning JSON results") return JSONResponse(content=results) @app.get("/") def hello_world(): return 'Hello, World!' if __name__ == "__main__": uvicorn.run("main:app", port=8001, reload=True)