import os import gradio as gr import cv2 import time import json import random import logging import matplotlib.pyplot as plt import shutil from datetime import datetime from collections import Counter from typing import Any, Dict, List, Optional, Tuple import numpy as np # Suppress Ultralytics warning by setting a writable config directory os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" # Import service modules try: from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video from services.detection_service import process_frame as process_generic from services.metrics_service import update_metrics from services.overlay_service import overlay_boxes from services.salesforce_dispatcher import send_to_salesforce from services.shadow_detection import detect_shadow_coverage from services.thermal_service import process_thermal from services.map_service import generate_map # Under Construction services from services.under_construction.earthwork_detection import process_earthwork from services.under_construction.culvert_check import process_culverts from services.under_construction.bridge_pier_check import process_bridge_piers # Operations Maintenance services from services.operations_maintenance.crack_detection import detect_cracks_and_holes from services.operations_maintenance.pothole_detection import process_potholes from services.operations_maintenance.signage_check import process_signages # Road Safety services from services.road_safety.barrier_check import process_barriers from services.road_safety.lighting_check import process_lighting from services.road_safety.accident_spot_check import process_accident_spots from services.road_safety.pothole_crack_detection import detect_potholes_and_cracks # Plantation services from services.plantation.plant_count import process_plants from services.plantation.plant_health import process_plant_health from services.plantation.missing_patch_check import process_missing_patches # General object detection from services.object_detection import detect_objects except ImportError as e: print(f"Failed to import service modules: {str(e)}") logging.error(f"Import error: {str(e)}") exit(1) # Configure logging logging.basicConfig( filename="app.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) # Global variables paused: bool = False frame_rate: float = 0.3 frame_count: int = 0 log_entries: List[str] = [] detected_counts: List[int] = [] last_frame: Optional[np.ndarray] = None last_metrics: Dict[str, Any] = {} last_timestamp: str = "" detected_plants: List[str] = [] # For plants and missing patches detected_issues: List[str] = [] # For cracks, holes, and other issues gps_coordinates: List[List[float]] = [] media_loaded: bool = False active_service: Optional[str] = None is_video: bool = True static_image: Optional[np.ndarray] = None # Track enabled services enabled_services: List[str] = [] # Constants DEFAULT_VIDEO_PATH = "sample.mp4" TEMP_IMAGE_PATH = os.path.abspath("temp.jpg") CAPTURED_FRAMES_DIR = os.path.abspath("captured_frames") OUTPUT_DIR = os.path.abspath("outputs") TEMP_MEDIA_DIR = os.path.abspath("temp_media") # Ensure directories exist with write permissions for directory in [CAPTURED_FRAMES_DIR, OUTPUT_DIR, TEMP_MEDIA_DIR]: os.makedirs(directory, exist_ok=True) os.chmod(directory, 0o777) # Ensure write permissions def initialize_media(media_file: Optional[Any] = None) -> str: global media_loaded, is_video, static_image, log_entries, frame_count release_video() static_image = None frame_count = 0 # Reset frame count on new media load # If no media file is provided, try the default video if media_file is None: media_path = DEFAULT_VIDEO_PATH log_entries.append(f"No media uploaded, attempting to load default: {media_path}") logging.info(f"No media uploaded, attempting to load default: {media_path}") else: # Validate media file if not hasattr(media_file, 'name') or not media_file.name: status = "Error: Invalid media file uploaded." log_entries.append(status) logging.error(status) media_loaded = False return status # Copy the uploaded file to a known location to avoid path issues original_path = media_file.name file_extension = os.path.splitext(original_path)[1].lower() temp_media_path = os.path.join(TEMP_MEDIA_DIR, f"uploaded_media{file_extension}") try: shutil.copy(original_path, temp_media_path) media_path = temp_media_path log_entries.append(f"Copied uploaded file to: {media_path}") logging.info(f"Copied uploaded file to: {media_path}") except Exception as e: status = "Error copying uploaded file: {str(e)}" log_entries.append(status) logging.error(status) media_loaded = False return status # Verify the file exists if not os.path.exists(media_path): status = f"Error: Media file '{media_path}' not found." log_entries.append(status) logging.error(status) media_loaded = False return status try: # Determine if the file is a video or image if file_extension in (".mp4", ".avi"): is_video = True preload_video(media_path) media_loaded = True status = f"Successfully loaded video: {media_path}" elif file_extension in (".jpg", ".jpeg", ".png"): is_video = False static_image = cv2.imread(media_path) if static_image is None: raise RuntimeError(f"Failed to load image: {media_path}") static_image = cv2.resize(static_image, (320, 240)) media_loaded = True status = f"Successfully loaded image: {media_path}" else: media_loaded = False status = "Error: Unsupported file format. Use .mp4, .avi, .jpg, .jpeg, or .png." log_entries.append(status) logging.error(status) return status log_entries.append(status) logging.info(status) return status except Exception as e: media_loaded = False status = f"Error loading media: {str(e)}" log_entries.append(status) logging.error(status) return status def set_active_service( uc_val: bool, om_val: bool, rs_val: bool, pl_val: bool ) -> Tuple[Optional[str], str]: global active_service, enabled_services enabled_services = [] if uc_val: enabled_services.append("under_construction") if om_val: enabled_services.append("operations_maintenance") if rs_val: enabled_services.append("road_safety") if pl_val: enabled_services.append("plantation") if not enabled_services: active_service = None log_entries.append("No service category enabled.") logging.info("No service category enabled.") return None, "No Service Category Enabled" active_service = "selected_services" # Custom state to process only selected services log_entries.append(f"Enabled services: {', '.join(enabled_services)}") logging.info(f"Enabled services: {', '.join(enabled_services)}") return active_service, f"Enabled: {', '.join([s.replace('_', ' ').title() for s in enabled_services])}" def generate_line_chart() -> Optional[str]: if not detected_counts: return None fig, ax = plt.subplots(figsize=(4, 2)) ax.plot(detected_counts[-50:], marker='o', color='#FF8C00') ax.set_title("Detections Over Time") ax.set_xlabel("Frame") ax.set_ylabel("Count") ax.grid(True) fig.tight_layout() chart_path = "chart_temp.png" try: fig.savefig(chart_path) plt.close(fig) return chart_path except Exception as e: log_entries.append(f"Error generating chart: {str(e)}") logging.error(f"Error generating chart: {str(e)}") return None def monitor_feed() -> Tuple[ Optional[np.ndarray], str, str, List[str], List[str], Optional[str], Optional[str] ]: global paused, frame_count, last_frame, last_metrics, last_timestamp global gps_coordinates, detected_plants, detected_issues, media_loaded global is_video, static_image, enabled_services if not media_loaded: log_entries.append("Cannot start processing: Media not loaded successfully.") logging.error("Media not loaded successfully.") return ( None, json.dumps({"error": "Media not loaded. Please upload a video or image file."}, indent=2), "\n".join(log_entries[-10:]), detected_plants, detected_issues, None, None ) if paused and last_frame is not None: frame = last_frame.copy() metrics = last_metrics.copy() else: max_retries = 3 start_time = time.time() for attempt in range(max_retries): try: if is_video: frame = get_next_video_frame() if frame is None: log_entries.append(f"Frame retrieval failed on attempt {attempt + 1}, resetting video.") logging.warning(f"Frame retrieval failed on attempt {attempt + 1}, resetting video.") reset_video_index() continue break else: frame = static_image.copy() break except Exception as e: log_entries.append(f"Frame retrieval error on attempt {attempt + 1}: {str(e)}") logging.error(f"Frame retrieval error on attempt {attempt + 1}: {str(e)}") if attempt == max_retries - 1: return ( None, json.dumps(last_metrics, indent=2), "\n".join(log_entries[-10:]), detected_plants, detected_issues, None, None ) else: log_entries.append("Failed to retrieve frame after maximum retries.") logging.error("Failed to retrieve frame after maximum retries.") return ( None, json.dumps(last_metrics, indent=2), "\n".join(log_entries[-10:]), detected_plants, detected_issues, None, None ) # Resize frame for faster detection (320x512) detection_frame = cv2.resize(frame, (512, 320)) all_detected_items: List[Dict[str, Any]] = [] shadow_issue = False thermal_flag = False try: # Process only the enabled services uc_enabled = "under_construction" in enabled_services om_enabled = "operations_maintenance" in enabled_services rs_enabled = "road_safety" in enabled_services pl_enabled = "plantation" in enabled_services # Under Construction Services if uc_enabled: earthwork_dets, detection_frame = process_earthwork(detection_frame) culvert_dets, detection_frame = process_culverts(detection_frame) bridge_pier_dets, detection_frame = process_bridge_piers(detection_frame) all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets) # Operations Maintenance Services if om_enabled: crack_hole_dets, detection_frame = detect_cracks_and_holes(detection_frame) pothole_dets, detection_frame = process_potholes(detection_frame) signage_dets, detection_frame = process_signages(detection_frame) all_detected_items.extend(crack_hole_dets + pothole_dets + signage_dets) # Road Safety Services if rs_enabled: barrier_dets, detection_frame = process_barriers(detection_frame) lighting_dets, detection_frame = process_lighting(detection_frame) accident_dets, detection_frame = process_accident_spots(detection_frame) pothole_crack_dets, detection_frame = detect_potholes_and_cracks(detection_frame) all_detected_items.extend(barrier_dets + lighting_dets + accident_dets + pothole_crack_dets) # Plantation Services if pl_enabled: plant_dets, detection_frame = process_plants(detection_frame) health_dets, detection_frame = process_plant_health(detection_frame) missing_dets, detection_frame = process_missing_patches(detection_frame) all_detected_items.extend(plant_dets + health_dets + missing_dets) # General Object Detection (only if at least one service is enabled) if uc_enabled or om_enabled or rs_enabled or pl_enabled: object_dets, detection_frame = detect_objects(detection_frame) all_detected_items.extend(object_dets) # Apply shadow detection try: cv2.imwrite(TEMP_IMAGE_PATH, detection_frame) shadow_issue = detect_shadow_coverage(TEMP_IMAGE_PATH) except Exception as e: log_entries.append(f"Error saving temp image for shadow detection: {str(e)}") logging.error(f"Error saving temp image: {str(e)}") shadow_issue = False # Apply thermal processing if frame is grayscale if len(detection_frame.shape) == 2: thermal_results = process_thermal(detection_frame) thermal_dets = thermal_results["detections"] detection_frame = thermal_results["frame"] all_detected_items.extend(thermal_dets) thermal_flag = bool(thermal_dets) # Scale bounding boxes back to original frame size orig_h, orig_w = frame.shape[:2] det_h, det_w = detection_frame.shape[:2] scale_x, scale_y = orig_w / det_w, orig_h / det_h for item in all_detected_items: if "box" in item: box = item["box"] item["box"] = [ int(box[0] * scale_x), int(box[1] * scale_y), int(box[2] * scale_x), int(box[3] * scale_y) ] # Overlay detections on the original frame with improved markings for item in all_detected_items: box = item.get("box", []) if not box: continue x_min, y_min, x_max, y_max = box label = item.get("label", "") dtype = item.get("type", "") health = item.get("health", "") # For plant health # Assign colors based on detection type with improved visibility if dtype == "plant": color = (0, 255, 255) # Cyan for plant count if health == "healthy": color = (255, 165, 0) # Orange for healthy plants elif dtype == "missing_patch": color = (255, 0, 255) # Magenta for missing patches elif dtype == "earthwork": color = (255, 105, 180) # Pink for earthwork elif dtype == "culvert": color = (0, 128, 128) # Teal for culverts elif dtype == "bridge_pier": color = (255, 127, 127) # Coral for bridge piers elif dtype == "pothole" or dtype == "hole": color = (255, 0, 0) # Red for potholes elif dtype == "crack": color = (255, 105, 180) # Pink for cracks elif dtype == "signage": color = (255, 255, 102) # Softer yellow for signage elif dtype == "car": color = (148, 0, 211) # Violet for cars elif dtype == "bike": color = (0, 255, 255) # Cyan for bikes elif dtype == "person": color = (50, 205, 50) # Lime green for humans elif dtype == "dog": color = (210, 105, 30) # Tan for dogs else: color = (255, 255, 255) # White for other objects # Draw thicker bounding box cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, 3) # Add a semi-transparent background to the label for better readability (text_w, text_h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) label_background = frame[y_min - text_h - 15:y_min - 5, x_min:x_min + text_w + 10] if label_background.size > 0: overlay = label_background.copy() cv2.rectangle(overlay, (0, 0), (text_w + 10, text_h + 10), (0, 0, 0), -1) alpha = 0.5 cv2.addWeighted(overlay, alpha, label_background, 1 - alpha, 0, label_background) # Draw the label text in a contrasting color (white for visibility) cv2.putText(frame, label, (x_min + 5, y_min - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Save temporary image try: cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95]) except Exception as e: log_entries.append(f"Error saving temp image: {str(e)}") logging.error(f"Error saving temp image: {str(e)}") except Exception as e: log_entries.append(f"Processing Error: {str(e)}") logging.error(f"Processing error: {str(e)}") all_detected_items = [] # Update detection metrics metrics = update_metrics(all_detected_items) # Generate GPS coordinates gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001] gps_coordinates.append(gps_coord) # Add GPS to detected items for mapping for item in all_detected_items: item["gps"] = gps_coord # Save frame if detections are present (for galleries) detection_types = {item.get("type") for item in all_detected_items if "type" in item} if detection_types: try: captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg") success = cv2.imwrite(captured_frame_path, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100]) if not success: raise RuntimeError(f"Failed to save captured frame: {captured_frame_path}") for item in all_detected_items: dtype = item.get("type", "") if dtype == "plant": detected_plants.append(captured_frame_path) if len(detected_plants) > 100: detected_plants.pop(0) else: detected_issues.append(captured_frame_path) if len(detected_issues) > 100: detected_issues.pop(0) except Exception as e: log_entries.append(f"Error saving captured frame: {str(e)}") logging.error(f"Error saving captured frame: {str(e)}") # Prepare data for Salesforce dispatch all_detections = { "detections": all_detected_items, "metrics": metrics, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "frame_count": frame_count, "gps_coordinates": gps_coord, "shadow_issue": shadow_issue, "thermal": thermal_flag } # Dispatch to Salesforce try: send_to_salesforce(all_detections) except Exception as e: log_entries.append(f"Salesforce Dispatch Error: {str(e)}") logging.error(f"Salesforce dispatch error: {str(e)}") # Save processed frame try: frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg") success = cv2.imwrite(frame_path, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100]) if not success: raise RuntimeError(f"Failed to save output frame: {frame_path}") except Exception as e: log_entries.append(f"Error saving output frame: {str(e)}") logging.error(f"Error saving output frame: {str(e)}") # Update global variables frame_count += 1 last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") last_frame = frame.copy() last_metrics = metrics # Track detections for metrics plant_detected = len([item for item in all_detected_items if item.get("type") == "plant"]) crack_detected = len([item for item in all_detected_items if item.get("type") == "crack"]) hole_detected = len([item for item in all_detected_items if item.get("type") == "hole" or item.get("type") == "pothole"]) missing_detected = len([item for item in all_detected_items if item.get("type") == "missing_patch"]) car_detected = len([item for item in all_detected_items if item.get("type") == "car"]) bike_detected = len([item for item in all_detected_items if item.get("type") == "bike"]) person_detected = len([item for item in all_detected_items if item.get("type") == "person"]) dog_detected = len([item for item in all_detected_items if item.get("type") == "dog"]) detected_counts.append(plant_detected + crack_detected + hole_detected + missing_detected + car_detected + bike_detected + person_detected + dog_detected) # Log frame processing details processing_time = time.time() - start_time detection_summary = { "timestamp": last_timestamp, "frame": frame_count, "plants": plant_detected, "cracks": crack_detected, "holes": hole_detected, "missing_patches": missing_detected, "cars": car_detected, "bikes": bike_detected, "persons": person_detected, "dogs": dog_detected, "gps": gps_coord, "processing_time_ms": processing_time * 1000 } log_message = json.dumps(detection_summary, indent=2) log_entries.append(log_message) logging.info(log_message) # Limit the size of logs and detection data if len(log_entries) > 100: log_entries.pop(0) if len(detected_counts) > 500: detected_counts.pop(0) # Resize frame and add metadata for display frame = cv2.resize(last_frame, (640, 480)) cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) # Generate map map_items = [item for item in last_metrics.get("items", []) if item.get("type") in ["crack", "hole", "pothole", "missing_patch"]] map_path = generate_map(gps_coordinates[-5:], map_items) return ( frame[:, :, ::-1], # Convert BGR to RGB for Gradio json.dumps(last_metrics, indent=2), "\n".join(log_entries[-10:]), detected_plants, detected_issues, generate_line_chart(), map_path ) # Gradio UI setup with corrected theme with gr.Blocks(theme=gr.themes.Soft(primary_hue="orange", secondary_hue="amber")) as app: gr.Markdown( """ # 🛡️ NHAI Drone Road Inspection Dashboard Monitor highway conditions in real-time using drone footage or static images. Select the services to enable below. """ ) with gr.Row(): with gr.Column(scale=3): media_input = gr.File(label="Upload Media File (e.g., sample.mp4, image.jpg)", file_types=[".mp4", ".avi", ".jpg", ".jpeg", ".png"]) load_button = gr.Button("Load Media", variant="primary") with gr.Column(scale=1): media_status = gr.Textbox( label="Media Load Status", value="Please upload a video/image file or ensure 'sample.mp4' exists in the root directory.", interactive=False ) with gr.Row(): with gr.Column(): uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=False) uc_status = gr.Textbox(label="Under Construction Status", value="Disabled", interactive=False) with gr.Column(): om_toggle = gr.Checkbox(label="Enable Operations Maintenance Services", value=False) om_status = gr.Textbox(label="Operations Maintenance Status", value="Disabled", interactive=False) with gr.Column(): rs_toggle = gr.Checkbox(label="Enable Road Safety Services", value=False) rs_status = gr.Textbox(label="Road Safety Status", value="Disabled", interactive=False) with gr.Column(): pl_toggle = gr.Checkbox(label="Enable Plantation Services", value=False) pl_status = gr.Textbox(label="Plantation Status", value="Disabled", interactive=False) status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a media file and enable services to start)") with gr.Row(): with gr.Column(scale=3): media_output = gr.Image(label="Live Feed", width=640, height=480, elem_id="live-feed") with gr.Column(scale=1): metrics_output = gr.Textbox( label="Detection Metrics", lines=10, interactive=False, placeholder="Detection metrics, counts will appear here." ) with gr.Row(): with gr.Column(scale=2): logs_output = gr.Textbox(label="Live Logs", lines=8, interactive=False) with gr.Column(scale=1): plant_images = gr.Gallery(label="Detected Plants (Last 100+)", columns=4, rows=13, height="auto") issue_images = gr.Gallery(label="Detected Issues (Last 100+)", columns=4, rows=13, height="auto") with gr.Row(): chart_output = gr.Image(label="Detection Trend") map_output = gr.Image(label="Issue Locations Map") with gr.Row(): pause_btn = gr.Button("⏸️ Pause", variant="secondary") resume_btn = gr.Button("▶️ Resume", variant="primary") frame_slider = gr.Slider(0.05, 1.0, value=0.3, label="Frame Interval (seconds)", step=0.05) gr.HTML(""" """) def toggle_pause() -> str: global paused paused = True return "**Status:** ⏸️ Paused" def toggle_resume() -> str: global paused paused = False return "**Status:** 🟢 Streaming" def set_frame_rate(val: float) -> None: global frame_rate frame_rate = val media_status.value = initialize_media() load_button.click( initialize_media, inputs=[media_input], outputs=[media_status] ) def update_toggles(uc_val: bool, om_val: bool, rs_val: bool, pl_val: bool) -> Tuple[str, str, str, str, str]: active, status_message = set_active_service(uc_val, om_val, rs_val, pl_val) uc_status_val = "Enabled" if uc_val else "Disabled" om_status_val = "Enabled" if om_val else "Disabled" rs_status_val = "Enabled" if rs_val else "Disabled" pl_status_val = "Enabled" if pl_val else "Disabled" return ( uc_status_val, om_status_val, rs_status_val, pl_status_val, status_message ) toggle_inputs = [uc_toggle, om_toggle, rs_toggle, pl_toggle] toggle_outputs = [uc_status, om_status, rs_status, pl_status, status_text] uc_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) om_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) rs_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) pl_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) pause_btn.click(toggle_pause, outputs=status_text) resume_btn.click(toggle_resume, outputs=status_text) frame_slider.change(set_frame_rate, inputs=[frame_slider]) def streaming_loop(): while True: if not media_loaded: yield None, json.dumps({"error": "Media not loaded. Please upload a video or image file."}, indent=2), "\n".join(log_entries[-10:]), detected_plants, detected_issues, None, None else: frame, metrics, logs, plants, issues, chart, map_path = monitor_feed() if frame is None: yield None, metrics, logs, plants, issues, chart, map_path else: yield frame, metrics, logs, plants, issues, chart, map_path if not is_video: # For static images, yield once and pause break time.sleep(frame_rate) app.load(streaming_loop, outputs=[media_output, metrics_output, logs_output, plant_images, issue_images, chart_output, map_output]) if __name__ == "__main__": app.launch(share=True) # Set share=True to create a public link