import supervision as sv from ultralytics import YOLO import cv2 import numpy as np from fastapi import FastAPI, File, UploadFile from fastapi.responses import JSONResponse, Response import uvicorn import logging from datetime import datetime import os import time from collections import defaultdict # Ensure the logs directory exists if not os.path.exists("logs"): os.makedirs("logs") app = FastAPI() # Load the exported ONNX model onnx_model = YOLO("models/best-data-v5.onnx", task="detect") # Define the logging configuration LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": { "default": { "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", }, "access": { "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", }, }, "handlers": { "default": { "formatter": "default", "class": "logging.StreamHandler", "stream": "ext://sys.stdout", }, "file": { "formatter": "default", "class": "logging.FileHandler", "filename": f"logs/{datetime.now().strftime('%Y-%m-%d')}.log", "mode": "a", }, "access": { "formatter": "access", "class": "logging.StreamHandler", "stream": "ext://sys.stdout", }, }, "loggers": { "": { "handlers": ["default", "file"], "level": "INFO", }, "uvicorn.access": { "handlers": ["access", "file"], "level": "INFO", "propagate": False, }, "ultralytics": { "handlers": ["default", "file"], "level": "INFO", "propagate": False, }, } } # Apply the logging configuration logging.config.dictConfig(LOGGING_CONFIG) def parse_detection(detections): parsed_rows = [] for i in range(len(detections.xyxy)): x_min = float(detections.xyxy[i][0]) y_min = float(detections.xyxy[i][1]) x_max = float(detections.xyxy[i][2]) y_max = float(detections.xyxy[i][3]) width = int(x_max - x_min) height = int(y_max - y_min) row = { "top": int(y_min), "left": int(x_min), "width": width, "height": height, "class_id": "" if detections.class_id is None else int(detections.class_id[i]), "confidence": "" if detections.confidence is None else float(detections.confidence[i]), "tracker_id": "" if detections.tracker_id is None else int(detections.tracker_id[i]), } if hasattr(detections, "data"): for key, value in detections.data.items(): row[key] = ( str(value[i]) if hasattr(value, "__getitem__") and value.ndim != 0 else str(value) ) parsed_rows.append(row) return parsed_rows # Run inference def callback(image_slice: np.ndarray) -> sv.Detections: # logging.info("Running callback for image slice") results = onnx_model(image_slice)[0] return sv.Detections.from_ultralytics(results) def infer(image): start_time = time.time() image_arr = np.frombuffer(image, np.uint8) image = cv2.imdecode(image_arr, cv2.IMREAD_COLOR) image = cv2.resize(image, (1920, 1920)) results = onnx_model(image)[0] # detections = sv.Detections.from_ultralytics(results) slicer = sv.InferenceSlicer(callback=callback, slice_wh=(640, 640)) detections = slicer(image=image) logging.info("Completed slicing and detection") parsed_rows = parse_detection(detections) # Count the occurrences of each class class_counts = defaultdict(int) for detection in parsed_rows: class_name = detection.get("class_name", "Unknown") class_counts[class_name] += 1 summary_info = ", ".join( [f"{count} {class_name}" for class_name, count in class_counts.items()] ) logging.info(f"Summary info: {summary_info}") logging.info(f"Run time: {time.time() - start_time:.2f} seconds") # label_annotator = sv.LabelAnnotator(text_color=sv.Color.BLACK) bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=4) annotated_image = image.copy() annotated_image = bounding_box_annotator.annotate(scene=annotated_image, detections=detections) # annotated_image = label_annotator.annotate(scene=annotated_image, detections=detections) # logging.info("Annotated image") return annotated_image, parsed_rows @app.post("/process-image/") async def process_image(image: UploadFile = File(...), draw_boxes: bool = False): filename = image.filename logging.info(f"Received process-image request for file: {filename}") image_data = await image.read() annotated_image, results = infer(image_data) if draw_boxes: _, img_encoded = cv2.imencode('.jpg', annotated_image) logging.info("Returning annotated image") return Response(content=img_encoded.tobytes(), media_type="image/jpeg") logging.info("Returning JSON results") return JSONResponse(content=results) @app.get("/") def hello_world(): return 'Hello, World!' if __name__ == "__main__": uvicorn.run("main:app", port=8001, reload=True)