vumichien's picture
Upload folder using huggingface_hub
ca91016 verified
raw
history blame
5.44 kB
import supervision as sv
from ultralytics import YOLO
import cv2
import numpy as np
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse, Response
import uvicorn
import logging
from datetime import datetime
import os
import time
from collections import defaultdict
# Ensure the logs directory exists
if not os.path.exists("logs"):
os.makedirs("logs")
app = FastAPI()
# Load the exported ONNX model
onnx_model = YOLO("models/best-data-v5.onnx", task="detect")
# Define the logging configuration
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
},
"access": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
},
},
"handlers": {
"default": {
"formatter": "default",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
"file": {
"formatter": "default",
"class": "logging.FileHandler",
"filename": f"logs/{datetime.now().strftime('%Y-%m-%d')}.log",
"mode": "a",
},
"access": {
"formatter": "access",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
},
"loggers": {
"": {
"handlers": ["default", "file"],
"level": "INFO",
},
"uvicorn.access": {
"handlers": ["access", "file"],
"level": "INFO",
"propagate": False,
},
"ultralytics": {
"handlers": ["default", "file"],
"level": "INFO",
"propagate": False,
},
}
}
# Apply the logging configuration
logging.config.dictConfig(LOGGING_CONFIG)
def parse_detection(detections):
parsed_rows = []
for i in range(len(detections.xyxy)):
x_min = float(detections.xyxy[i][0])
y_min = float(detections.xyxy[i][1])
x_max = float(detections.xyxy[i][2])
y_max = float(detections.xyxy[i][3])
width = int(x_max - x_min)
height = int(y_max - y_min)
row = {
"top": int(y_min),
"left": int(x_min),
"width": width,
"height": height,
"class_id": ""
if detections.class_id is None
else int(detections.class_id[i]),
"confidence": ""
if detections.confidence is None
else float(detections.confidence[i]),
"tracker_id": ""
if detections.tracker_id is None
else int(detections.tracker_id[i]),
}
if hasattr(detections, "data"):
for key, value in detections.data.items():
row[key] = (
str(value[i])
if hasattr(value, "__getitem__") and value.ndim != 0
else str(value)
)
parsed_rows.append(row)
return parsed_rows
# Run inference
def callback(image_slice: np.ndarray) -> sv.Detections:
# logging.info("Running callback for image slice")
results = onnx_model(image_slice)[0]
return sv.Detections.from_ultralytics(results)
def infer(image):
start_time = time.time()
image_arr = np.frombuffer(image, np.uint8)
image = cv2.imdecode(image_arr, cv2.IMREAD_COLOR)
image = cv2.resize(image, (1920, 1920))
results = onnx_model(image)[0]
# detections = sv.Detections.from_ultralytics(results)
slicer = sv.InferenceSlicer(callback=callback, slice_wh=(640, 640))
detections = slicer(image=image)
logging.info("Completed slicing and detection")
parsed_rows = parse_detection(detections)
# Count the occurrences of each class
class_counts = defaultdict(int)
for detection in parsed_rows:
class_name = detection.get("class_name", "Unknown")
class_counts[class_name] += 1
summary_info = ", ".join(
[f"{count} {class_name}" for class_name, count in class_counts.items()]
)
logging.info(f"Summary info: {summary_info}")
logging.info(f"Run time: {time.time() - start_time:.2f} seconds")
# label_annotator = sv.LabelAnnotator(text_color=sv.Color.BLACK)
bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=4)
annotated_image = image.copy()
annotated_image = bounding_box_annotator.annotate(scene=annotated_image, detections=detections)
# annotated_image = label_annotator.annotate(scene=annotated_image, detections=detections)
# logging.info("Annotated image")
return annotated_image, parsed_rows
@app.post("/process-image/")
async def process_image(image: UploadFile = File(...), draw_boxes: bool = False):
filename = image.filename
logging.info(f"Received process-image request for file: {filename}")
image_data = await image.read()
annotated_image, results = infer(image_data)
if draw_boxes:
_, img_encoded = cv2.imencode('.jpg', annotated_image)
logging.info("Returning annotated image")
return Response(content=img_encoded.tobytes(), media_type="image/jpeg")
logging.info("Returning JSON results")
return JSONResponse(content=results)
@app.get("/")
def hello_world():
return 'Hello, World!'
if __name__ == "__main__":
uvicorn.run("main:app", port=8001, reload=True)