Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import cv2 | |
| import time | |
| import json | |
| import random | |
| import logging | |
| import matplotlib.pyplot as plt | |
| import shutil | |
| from datetime import datetime | |
| from collections import Counter | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import numpy as np | |
| # Suppress Ultralytics warning by setting a writable config directory | |
| os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" | |
| # Import service modules | |
| try: | |
| from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video | |
| from services.detection_service import process_frame as process_generic | |
| from services.metrics_service import update_metrics | |
| from services.overlay_service import overlay_boxes | |
| from services.salesforce_dispatcher import send_to_salesforce | |
| from services.shadow_detection import detect_shadow_coverage | |
| from services.thermal_service import process_thermal | |
| from services.map_service import generate_map | |
| # Under Construction services | |
| from services.under_construction.earthwork_detection import process_earthwork | |
| from services.under_construction.culvert_check import process_culverts | |
| from services.under_construction.bridge_pier_check import process_bridge_piers | |
| # Operations Maintenance services | |
| from services.operations_maintenance.crack_detection import detect_cracks_and_holes | |
| from services.operations_maintenance.pothole_detection import process_potholes | |
| from services.operations_maintenance.signage_check import process_signages | |
| # Road Safety services | |
| from services.road_safety.barrier_check import process_barriers | |
| from services.road_safety.lighting_check import process_lighting | |
| from services.road_safety.accident_spot_check import process_accident_spots | |
| from services.road_safety.pothole_crack_detection import detect_potholes_and_cracks | |
| # Plantation services | |
| from services.plantation.plant_count import process_plants | |
| from services.plantation.plant_health import process_plant_health | |
| from services.plantation.missing_patch_check import process_missing_patches | |
| # General object detection | |
| from services.object_detection import detect_objects | |
| except ImportError as e: | |
| print(f"Failed to import service modules: {str(e)}") | |
| logging.error(f"Import error: {str(e)}") | |
| exit(1) | |
| # Configure logging | |
| logging.basicConfig( | |
| filename="app.log", | |
| level=logging.INFO, | |
| format="%(asctime)s - %(levelname)s - %(message)s" | |
| ) | |
| # Global variables | |
| paused: bool = False | |
| frame_rate: float = 0.3 | |
| frame_count: int = 0 | |
| log_entries: List[str] = [] | |
| detected_counts: List[int] = [] | |
| last_frame: Optional[np.ndarray] = None | |
| last_metrics: Dict[str, Any] = {} | |
| last_timestamp: str = "" | |
| detected_plants: List[str] = [] # For plants and missing patches | |
| detected_issues: List[str] = [] # For cracks, holes, and other issues | |
| gps_coordinates: List[List[float]] = [] | |
| media_loaded: bool = False | |
| active_service: Optional[str] = None | |
| is_video: bool = True | |
| static_image: Optional[np.ndarray] = None | |
| # Constants | |
| DEFAULT_VIDEO_PATH = "sample.mp4" | |
| TEMP_IMAGE_PATH = os.path.abspath("temp.jpg") | |
| CAPTURED_FRAMES_DIR = os.path.abspath("captured_frames") | |
| OUTPUT_DIR = os.path.abspath("outputs") | |
| TEMP_MEDIA_DIR = os.path.abspath("temp_media") | |
| # Ensure directories exist with write permissions | |
| for directory in [CAPTURED_FRAMES_DIR, OUTPUT_DIR, TEMP_MEDIA_DIR]: | |
| os.makedirs(directory, exist_ok=True) | |
| os.chmod(directory, 0o777) # Ensure write permissions | |
| def initialize_media(media_file: Optional[Any] = None) -> str: | |
| global media_loaded, is_video, static_image, log_entries, frame_count | |
| release_video() | |
| static_image = None | |
| frame_count = 0 # Reset frame count on new media load | |
| # If no media file is provided, try the default video | |
| if media_file is None: | |
| media_path = DEFAULT_VIDEO_PATH | |
| log_entries.append(f"No media uploaded, attempting to load default: {media_path}") | |
| logging.info(f"No media uploaded, attempting to load default: {media_path}") | |
| else: | |
| # Validate media file | |
| if not hasattr(media_file, 'name') or not media_file.name: | |
| status = "Error: Invalid media file uploaded." | |
| log_entries.append(status) | |
| logging.error(status) | |
| media_loaded = False | |
| return status | |
| # Copy the uploaded file to a known location to avoid path issues | |
| original_path = media_file.name | |
| file_extension = os.path.splitext(original_path)[1].lower() | |
| temp_media_path = os.path.join(TEMP_MEDIA_DIR, f"uploaded_media{file_extension}") | |
| try: | |
| shutil.copy(original_path, temp_media_path) | |
| media_path = temp_media_path | |
| log_entries.append(f"Copied uploaded file to: {media_path}") | |
| logging.info(f"Copied uploaded file to: {media_path}") | |
| except Exception as e: | |
| status = f"Error copying uploaded file: {str(e)}" | |
| log_entries.append(status) | |
| logging.error(status) | |
| media_loaded = False | |
| return status | |
| # Verify the file exists | |
| if not os.path.exists(media_path): | |
| status = f"Error: Media file '{media_path}' not found." | |
| log_entries.append(status) | |
| logging.error(status) | |
| media_loaded = False | |
| return status | |
| try: | |
| # Determine if the file is a video or image | |
| if file_extension in (".mp4", ".avi"): | |
| is_video = True | |
| preload_video(media_path) | |
| media_loaded = True | |
| status = f"Successfully loaded video: {media_path}" | |
| elif file_extension in (".jpg", ".jpeg", ".png"): | |
| is_video = False | |
| static_image = cv2.imread(media_path) | |
| if static_image is None: | |
| raise RuntimeError(f"Failed to load image: {media_path}") | |
| static_image = cv2.resize(static_image, (320, 240)) | |
| media_loaded = True | |
| status = f"Successfully loaded image: {media_path}" | |
| else: | |
| media_loaded = False | |
| status = "Error: Unsupported file format. Use .mp4, .avi, .jpg, .jpeg, or .png." | |
| log_entries.append(status) | |
| logging.error(status) | |
| return status | |
| log_entries.append(status) | |
| logging.info(status) | |
| return status | |
| except Exception as e: | |
| media_loaded = False | |
| status = f"Error loading media: {str(e)}" | |
| log_entries.append(status) | |
| logging.error(status) | |
| return status | |
| def set_active_service( | |
| service_name: str, | |
| uc_val: bool, | |
| om_val: bool, | |
| rs_val: bool, | |
| pl_val: bool | |
| ) -> Tuple[Optional[str], str]: | |
| global active_service | |
| # Enable all requested services | |
| enabled_services = [] | |
| if uc_val: | |
| enabled_services.append("under_construction") | |
| if om_val: | |
| enabled_services.append("operations_maintenance") | |
| if rs_val: | |
| enabled_services.append("road_safety") | |
| if pl_val: | |
| enabled_services.append("plantation") | |
| if not enabled_services: | |
| active_service = None | |
| log_entries.append("No service category enabled.") | |
| logging.info("No service category enabled.") | |
| return None, "No Service Category Enabled" | |
| # Since multiple services are requested, we'll process all enabled services | |
| active_service = "all_enabled" # Custom state to process all enabled services | |
| log_entries.append(f"Enabled services: {', '.join(enabled_services)}") | |
| logging.info(f"Enabled services: {', '.join(enabled_services)}") | |
| return active_service, f"Enabled: {', '.join([s.replace('_', ' ').title() for s in enabled_services])}" | |
| def generate_line_chart() -> Optional[str]: | |
| if not detected_counts: | |
| return None | |
| fig, ax = plt.subplots(figsize=(4, 2)) | |
| ax.plot(detected_counts[-50:], marker='o', color='#4682B4') | |
| ax.set_title("Detections Over Time") | |
| ax.set_xlabel("Frame") | |
| ax.set_ylabel("Count") | |
| ax.grid(True) | |
| fig.tight_layout() | |
| chart_path = "chart_temp.png" | |
| try: | |
| fig.savefig(chart_path) | |
| plt.close(fig) | |
| return chart_path | |
| except Exception as e: | |
| log_entries.append(f"Error generating chart: {str(e)}") | |
| logging.error(f"Error generating chart: {str(e)}") | |
| return None | |
| def monitor_feed() -> Tuple[ | |
| Optional[np.ndarray], | |
| str, | |
| str, | |
| List[str], | |
| List[str], | |
| Optional[str], | |
| Optional[str] | |
| ]: | |
| global paused, frame_count, last_frame, last_metrics, last_timestamp | |
| global gps_coordinates, detected_plants, detected_issues, media_loaded | |
| global is_video, static_image | |
| if not media_loaded: | |
| log_entries.append("Cannot start processing: Media not loaded successfully.") | |
| logging.error("Media not loaded successfully.") | |
| return ( | |
| None, | |
| json.dumps({"error": "Media not loaded. Please upload a video or image file."}, indent=2), | |
| "\n".join(log_entries[-10:]), | |
| detected_plants, | |
| detected_issues, | |
| None, | |
| None | |
| ) | |
| if paused and last_frame is not None: | |
| frame = last_frame.copy() | |
| metrics = last_metrics.copy() | |
| else: | |
| max_retries = 3 | |
| start_time = time.time() | |
| for attempt in range(max_retries): | |
| try: | |
| if is_video: | |
| frame = get_next_video_frame() | |
| if frame is None: | |
| log_entries.append(f"Frame retrieval failed on attempt {attempt + 1}, resetting video.") | |
| logging.warning(f"Frame retrieval failed on attempt {attempt + 1}, resetting video.") | |
| reset_video_index() | |
| continue | |
| break | |
| else: | |
| frame = static_image.copy() | |
| break | |
| except Exception as e: | |
| log_entries.append(f"Frame retrieval error on attempt {attempt + 1}: {str(e)}") | |
| logging.error(f"Frame retrieval error on attempt {attempt + 1}: {str(e)}") | |
| if attempt == max_retries - 1: | |
| return ( | |
| None, | |
| json.dumps(last_metrics, indent=2), | |
| "\n".join(log_entries[-10:]), | |
| detected_plants, | |
| detected_issues, | |
| None, | |
| None | |
| ) | |
| else: | |
| log_entries.append("Failed to retrieve frame after maximum retries.") | |
| logging.error("Failed to retrieve frame after maximum retries.") | |
| return ( | |
| None, | |
| json.dumps(last_metrics, indent=2), | |
| "\n".join(log_entries[-10:]), | |
| detected_plants, | |
| detected_issues, | |
| None, | |
| None | |
| ) | |
| # Resize frame for faster detection (320x512) | |
| detection_frame = cv2.resize(frame, (512, 320)) | |
| all_detected_items: List[Dict[str, Any]] = [] | |
| shadow_issue = False | |
| thermal_flag = False | |
| try: | |
| # Process all enabled services | |
| # Under Construction Services | |
| earthwork_dets, detection_frame = process_earthwork(detection_frame) | |
| culvert_dets, detection_frame = process_culverts(detection_frame) | |
| bridge_pier_dets, detection_frame = process_bridge_piers(detection_frame) | |
| all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets) | |
| # Operations Maintenance Services | |
| crack_hole_dets, detection_frame = detect_cracks_and_holes(detection_frame) | |
| pothole_dets, detection_frame = process_potholes(detection_frame) | |
| signage_dets, detection_frame = process_signages(detection_frame) | |
| all_detected_items.extend(crack_hole_dets + pothole_dets + signage_dets) | |
| # Road Safety Services | |
| barrier_dets, detection_frame = process_barriers(detection_frame) | |
| lighting_dets, detection_frame = process_lighting(detection_frame) | |
| accident_dets, detection_frame = process_accident_spots(detection_frame) | |
| pothole_crack_dets, detection_frame = detect_potholes_and_cracks(detection_frame) | |
| all_detected_items.extend(barrier_dets + lighting_dets + accident_dets + pothole_crack_dets) | |
| # Plantation Services | |
| plant_dets, detection_frame = process_plants(detection_frame) | |
| health_dets, detection_frame = process_plant_health(detection_frame) | |
| missing_dets, detection_frame = process_missing_patches(detection_frame) | |
| all_detected_items.extend(plant_dets + health_dets + missing_dets) | |
| # General Object Detection (cars, bikes, humans, dogs, etc.) | |
| object_dets, detection_frame = detect_objects(detection_frame) | |
| all_detected_items.extend(object_dets) | |
| # Apply shadow detection | |
| try: | |
| cv2.imwrite(TEMP_IMAGE_PATH, detection_frame) | |
| shadow_issue = detect_shadow_coverage(TEMP_IMAGE_PATH) | |
| except Exception as e: | |
| log_entries.append(f"Error saving temp image for shadow detection: {str(e)}") | |
| logging.error(f"Error saving temp image: {str(e)}") | |
| shadow_issue = False | |
| # Apply thermal processing if frame is grayscale | |
| if len(detection_frame.shape) == 2: | |
| thermal_results = process_thermal(detection_frame) | |
| thermal_dets = thermal_results["detections"] | |
| detection_frame = thermal_results["frame"] | |
| all_detected_items.extend(thermal_dets) | |
| thermal_flag = bool(thermal_dets) | |
| # Scale bounding boxes back to original frame size | |
| orig_h, orig_w = frame.shape[:2] | |
| det_h, det_w = detection_frame.shape[:2] | |
| scale_x, scale_y = orig_w / det_w, orig_h / det_h | |
| for item in all_detected_items: | |
| if "box" in item: | |
| box = item["box"] | |
| item["box"] = [ | |
| int(box[0] * scale_x), | |
| int(box[1] * scale_y), | |
| int(box[2] * scale_x), | |
| int(box[3] * scale_y) | |
| ] | |
| # Overlay detections on the original frame with specified colors | |
| for item in all_detected_items: | |
| box = item.get("box", []) | |
| if not box: | |
| continue | |
| x_min, y_min, x_max, y_max = box | |
| label = item.get("label", "") | |
| dtype = item.get("type", "") | |
| health = item.get("health", "") # For plant health | |
| # Assign colors based on detection type as per requirements | |
| if dtype == "plant": | |
| color = (255, 0, 0) # Blue mark for plant count | |
| if health == "healthy": | |
| color = (255, 165, 0) # Orange mark for healthy plants | |
| elif dtype == "missing_patch": | |
| color = (0, 0, 255) # Red mark for missing patches | |
| elif dtype == "earthwork": | |
| color = (255, 105, 180) # Pink for earthwork | |
| elif dtype == "culvert": | |
| color = (0, 165, 255) # Blue and orange mix (approximated) | |
| elif dtype == "bridge_pier": | |
| color = (255, 99, 71) # Light red for bridge piers | |
| elif dtype == "pothole" or dtype == "hole": | |
| color = (255, 0, 0) # Red for potholes (from pothole_detection and pothole_crack_detection) | |
| elif dtype == "crack": | |
| color = (255, 105, 180) # Pink for cracks | |
| elif dtype == "signage": | |
| color = (255, 255, 0) # Yellow for signage | |
| elif dtype == "car": | |
| color = (128, 0, 128) # Purple for cars | |
| elif dtype == "bike": | |
| color = (0, 255, 255) # Cyan for bikes | |
| elif dtype == "person": | |
| color = (0, 255, 0) # Green for humans | |
| elif dtype == "dog": | |
| color = (139, 69, 19) # Brown for dogs | |
| else: | |
| color = (255, 255, 255) # White for other objects | |
| cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, 2) | |
| cv2.putText(frame, label, (x_min, y_min - 10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) | |
| # Save temporary image | |
| try: | |
| cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95]) | |
| except Exception as e: | |
| log_entries.append(f"Error saving temp image: {str(e)}") | |
| logging.error(f"Error saving temp image: {str(e)}") | |
| except Exception as e: | |
| log_entries.append(f"Processing Error: {str(e)}") | |
| logging.error(f"Processing error: {str(e)}") | |
| all_detected_items = [] | |
| # Update detection metrics | |
| metrics = update_metrics(all_detected_items) | |
| # Generate GPS coordinates | |
| gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001] | |
| gps_coordinates.append(gps_coord) | |
| # Add GPS to detected items for mapping | |
| for item in all_detected_items: | |
| item["gps"] = gps_coord | |
| # Save frame if detections are present | |
| detection_types = {item.get("type") for item in all_detected_items if "type" in item} | |
| if detection_types: | |
| try: | |
| captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg") | |
| success = cv2.imwrite(captured_frame_path, frame) | |
| if not success: | |
| raise RuntimeError(f"Failed to save captured frame: {captured_frame_path}") | |
| for item in all_detected_items: | |
| dtype = item.get("type", "") | |
| if dtype == "plant": | |
| detected_plants.append(captured_frame_path) | |
| if len(detected_plants) > 100: | |
| detected_plants.pop(0) | |
| else: | |
| detected_issues.append(captured_frame_path) | |
| if len(detected_issues) > 100: | |
| detected_issues.pop(0) | |
| except Exception as e: | |
| log_entries.append(f"Error saving captured frame: {str(e)}") | |
| logging.error(f"Error saving captured frame: {str(e)}") | |
| # Prepare data for Salesforce dispatch | |
| all_detections = { | |
| "detections": all_detected_items, | |
| "metrics": metrics, | |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| "frame_count": frame_count, | |
| "gps_coordinates": gps_coord, | |
| "shadow_issue": shadow_issue, | |
| "thermal": thermal_flag | |
| } | |
| # Dispatch to Salesforce | |
| try: | |
| send_to_salesforce(all_detections) | |
| except Exception as e: | |
| log_entries.append(f"Salesforce Dispatch Error: {str(e)}") | |
| logging.error(f"Salesforce dispatch error: {str(e)}") | |
| # Save processed frame | |
| try: | |
| frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg") | |
| success = cv2.imwrite(frame_path, frame) | |
| if not success: | |
| raise RuntimeError(f"Failed to save output frame: {frame_path}") | |
| except Exception as e: | |
| log_entries.append(f"Error saving output frame: {str(e)}") | |
| logging.error(f"Error saving output frame: {str(e)}") | |
| # Update global variables | |
| frame_count += 1 | |
| last_timestamp = datetime.now().strftime("%Y-%m-d %H:%M:%S") | |
| last_frame = frame.copy() | |
| last_metrics = metrics | |
| # Track detections for metrics | |
| plant_detected = len([item for item in all_detected_items if item.get("type") == "plant"]) | |
| crack_detected = len([item for item in all_detected_items if item.get("type") == "crack"]) | |
| hole_detected = len([item for item in all_detected_items if item.get("type") == "hole" or item.get("type") == "pothole"]) | |
| missing_detected = len([item for item in all_detected_items if item.get("type") == "missing_patch"]) | |
| car_detected = len([item for item in all_detected_items if item.get("type") == "car"]) | |
| bike_detected = len([item for item in all_detected_items if item.get("type") == "bike"]) | |
| person_detected = len([item for item in all_detected_items if item.get("type") == "person"]) | |
| dog_detected = len([item for item in all_detected_items if item.get("type") == "dog"]) | |
| detected_counts.append(plant_detected + crack_detected + hole_detected + missing_detected + | |
| car_detected + bike_detected + person_detected + dog_detected) | |
| # Log frame processing details in the requested format | |
| processing_time = time.time() - start_time | |
| detection_summary = { | |
| "timestamp": last_timestamp, | |
| "frame": frame_count, | |
| "plants": plant_detected, | |
| "cracks": crack_detected, | |
| "holes": hole_detected, | |
| "missing_patches": missing_detected, | |
| "cars": car_detected, | |
| "bikes": bike_detected, | |
| "persons": person_detected, | |
| "dogs": dog_detected, | |
| "gps": gps_coord, | |
| "processing_time_ms": processing_time * 1000 | |
| } | |
| log_message = json.dumps(detection_summary, indent=2) | |
| log_entries.append(log_message) | |
| logging.info(log_message) | |
| # Limit the size of logs and detection data | |
| if len(log_entries) > 100: | |
| log_entries.pop(0) | |
| if len(detected_counts) > 500: | |
| detected_counts.pop(0) | |
| # Resize frame and add metadata for display | |
| frame = cv2.resize(last_frame, (640, 480)) | |
| cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) | |
| cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) | |
| # Generate map | |
| map_items = [item for item in last_metrics.get("items", []) if item.get("type") in ["crack", "hole", "pothole", "missing_patch"]] | |
| map_path = generate_map(gps_coordinates[-5:], map_items) | |
| return ( | |
| frame[:, :, ::-1], # Convert BGR to RGB for Gradio | |
| json.dumps(last_metrics, indent=2), | |
| "\n".join(log_entries[-10:]), | |
| detected_plants, | |
| detected_issues, | |
| generate_line_chart(), | |
| map_path | |
| ) | |
| # Gradio UI setup | |
| with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="green")) as app: | |
| gr.Markdown( | |
| """ | |
| # 🛡️ NHAI Drone Road Inspection Dashboard | |
| Monitor highway conditions in real-time using drone footage or static images. All services are enabled as requested. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| media_input = gr.File(label="Upload Media File (e.g., sample.mp4, image.jpg)", file_types=[".mp4", ".avi", ".jpg", ".jpeg", ".png"]) | |
| load_button = gr.Button("Load Media", variant="primary") | |
| with gr.Column(scale=1): | |
| media_status = gr.Textbox( | |
| label="Media Load Status", | |
| value="Please upload a video/image file or ensure 'sample.mp4' exists in the root directory.", | |
| interactive=False | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=True) | |
| uc_status = gr.Textbox(label="Under Construction Status", value="Enabled", interactive=False) | |
| with gr.Column(): | |
| om_toggle = gr.Checkbox(label="Enable Operations Maintenance Services", value=True) | |
| om_status = gr.Textbox(label="Operations Maintenance Status", value="Enabled", interactive=False) | |
| with gr.Column(): | |
| rs_toggle = gr.Checkbox(label="Enable Road Safety Services", value=True) | |
| rs_status = gr.Textbox(label="Road Safety Status", value="Enabled", interactive=False) | |
| with gr.Column(): | |
| pl_toggle = gr.Checkbox(label="Enable Plantation Services", value=True) | |
| pl_status = gr.Textbox(label="Plantation Status", value="Enabled", interactive=False) | |
| status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a media file to start)") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| media_output = gr.Image(label="Live Feed", width=640, height=480, elem_id="live-feed") | |
| with gr.Column(scale=1): | |
| metrics_output = gr.Textbox( | |
| label="Detection Metrics", | |
| lines=10, | |
| interactive=False, | |
| placeholder="Detection metrics, counts will appear here." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| logs_output = gr.Textbox(label="Live Logs", lines=8, interactive=False) | |
| with gr.Column(scale=1): | |
| plant_images = gr.Gallery(label="Detected Plants (Last 100+)", columns=4, rows=13, height="auto") | |
| issue_images = gr.Gallery(label="Detected Issues (Last 100+)", columns=4, rows=13, height="auto") | |
| with gr.Row(): | |
| chart_output = gr.Image(label="Detection Trend") | |
| map_output = gr.Image(label="Issue Locations Map") | |
| with gr.Row(): | |
| pause_btn = gr.Button("⏸️ Pause", variant="secondary") | |
| resume_btn = gr.Button("▶️ Resume", variant="primary") | |
| frame_slider = gr.Slider(0.05, 1.0, value=0.3, label="Frame Interval (seconds)", step=0.05) | |
| gr.HTML(""" | |
| <style> | |
| #live-feed { | |
| border: 2px solid #4682B4; | |
| border-radius: 10px; | |
| } | |
| .gr-button-primary { | |
| background-color: #4682B4 !important; | |
| } | |
| .gr-button-secondary { | |
| background-color: #FF6347 !important; | |
| } | |
| </style> | |
| """) | |
| def toggle_pause() -> str: | |
| global paused | |
| paused = True | |
| return "**Status:** ⏸️ Paused" | |
| def toggle_resume() -> str: | |
| global paused | |
| paused = False | |
| return "**Status:** 🟢 Streaming" | |
| def set_frame_rate(val: float) -> None: | |
| global frame_rate | |
| frame_rate = val | |
| media_status.value = initialize_media() | |
| load_button.click( | |
| initialize_media, | |
| inputs=[media_input], | |
| outputs=[media_status] | |
| ) | |
| def update_toggles(uc_val: bool, om_val: bool, rs_val: bool, pl_val: bool) -> Tuple[str, str, str, str, str]: | |
| active, status_message = set_active_service("toggle", uc_val, om_val, rs_val, pl_val) | |
| uc_status_val = "Enabled" if "under_construction" in status_message.lower() else "Disabled" | |
| om_status_val = "Enabled" if "operations_maintenance" in status_message.lower() else "Disabled" | |
| rs_status_val = "Enabled" if "road_safety" in status_message.lower() else "Disabled" | |
| pl_status_val = "Enabled" if "plantation" in status_message.lower() else "Disabled" | |
| return ( | |
| uc_status_val, om_status_val, rs_status_val, pl_status_val, status_message | |
| ) | |
| toggle_inputs = [uc_toggle, om_toggle, rs_toggle, pl_toggle] | |
| toggle_outputs = [uc_status, om_status, rs_status, pl_status, status_text] | |
| uc_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) | |
| om_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) | |
| rs_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) | |
| pl_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) | |
| pause_btn.click(toggle_pause, outputs=status_text) | |
| resume_btn.click(toggle_resume, outputs=status_text) | |
| frame_slider.change(set_frame_rate, inputs=[frame_slider]) | |
| def streaming_loop(): | |
| while True: | |
| if not media_loaded: | |
| yield None, json.dumps({"error": "Media not loaded. Please upload a video or image file."}, indent=2), "\n".join(log_entries[-10:]), detected_plants, detected_issues, None, None | |
| else: | |
| frame, metrics, logs, plants, issues, chart, map_path = monitor_feed() | |
| if frame is None: | |
| yield None, metrics, logs, plants, issues, chart, map_path | |
| else: | |
| yield frame, metrics, logs, plants, issues, chart, map_path | |
| if not is_video: | |
| # For static images, yield once and pause | |
| break | |
| time.sleep(frame_rate) | |
| app.load(streaming_loop, outputs=[media_output, metrics_output, logs_output, plant_images, issue_images, chart_output, map_output]) | |
| if __name__ == "__main__": | |
| app.launch(share=True) # Set share=True to create a public link |