import gradio as gr import json import os import sys import logging import pandas as pd import time from datetime import datetime, timedelta import psutil from pathlib import Path # Internal config for paths and markers try: from src.config import DATA_DIR, LOG_DIR, LAST_RUN_PATH except Exception: # Fallbacks if import path differs in Spaces try: from config import DATA_DIR, LOG_DIR, LAST_RUN_PATH # type: ignore except Exception: DATA_DIR = os.environ.get('DATA_DIR', '/data') LOG_DIR = os.environ.get('LOG_DIR', os.path.join(DATA_DIR, 'logs')) LAST_RUN_PATH = os.environ.get('LAST_RUN_PATH', '/tmp/last_run.txt') # Add src to Python path for imports sys.path.insert(0, '/app/src') sys.path.insert(0, '/app') # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(sys.stdout)] ) logger = logging.getLogger(__name__) def get_health_status(): """Get basic health status""" try: # Get process info process = psutil.Process() memory_mb = process.memory_info().rss / 1024 / 1024 cpu_percent = process.cpu_percent() # Get system info memory = psutil.virtual_memory() disk = psutil.disk_usage('/') # Check scheduler status scheduler_running = False last_run_time = "Unknown" try: last_run_file = LAST_RUN_PATH if os.path.exists(last_run_file): with open(last_run_file, 'r') as f: last_run_str = f.read().strip() if last_run_str: try: last_run = datetime.strptime(last_run_str, '%Y-%m-%d %H:%M:%S') time_since_last_run = (datetime.now() - last_run).total_seconds() scheduler_running = time_since_last_run < 2700 # 45 minutes last_run_time = last_run_str except Exception: logger.debug("Malformed last_run value: %s", last_run_str) scheduler_running = False last_run_time = "Unknown" else: scheduler_running = False last_run_time = "Unknown" except Exception as e: logger.warning(f"Could not check scheduler status: {e}") return { "status": "healthy" if memory_mb < 400 else "warning", "timestamp": datetime.now().isoformat(), "process_memory_mb": round(memory_mb, 2), "process_cpu_percent": round(cpu_percent, 2), "system_memory_percent": round(memory.percent, 1), "system_memory_available_gb": round(memory.available / (1024**3), 2), "disk_free_gb": round(disk.free / (1024**3), 2), "scheduler_running": scheduler_running, "scheduler_last_run": last_run_time } except Exception as e: logger.error(f"Health check failed: {e}") return { "status": "error", "error": str(e), "timestamp": datetime.now().isoformat() } def get_pipeline_status(): """Get data pipeline status""" try: data_dirs = [ os.path.join(DATA_DIR, 'merged', 'features'), os.path.join(DATA_DIR, 'merged', 'train'), os.path.join(DATA_DIR, 'alpaca'), os.path.join(DATA_DIR, 'advisorai-data'), ] recent_files = 0 total_size = 0 for data_dir in data_dirs: if os.path.exists(data_dir): for root, dirs, files in os.walk(data_dir): for file in files: if file.endswith(('.json', '.parquet', '.csv')): file_path = os.path.join(root, file) try: stat = os.stat(file_path) # Count files modified in last 24 hours if time.time() - stat.st_mtime < 86400: recent_files += 1 total_size += stat.st_size except Exception: continue return { "status": "running" if recent_files > 0 else "stale", "recent_files_24h": recent_files, "total_data_size_gb": round(total_size / (1024**3), 2), "last_check": datetime.now().isoformat() } except Exception as e: logger.error(f"Pipeline status check failed: {e}") return { "status": "error", "error": str(e), "last_check": datetime.now().isoformat() } def get_recent_files(): """Get list of recent files in the data directories""" try: base_paths = [ os.path.join(DATA_DIR, 'merged', 'features'), os.path.join(DATA_DIR, 'merged', 'train'), os.path.join(DATA_DIR, 'alpaca'), os.path.join(DATA_DIR, 'advisorai-data', 'features'), ] recent_files = [] for base_path in base_paths: if os.path.exists(base_path): for root, dirs, files in os.walk(base_path): for file in files[:10]: # Limit to 10 files per directory file_path = os.path.join(root, file) try: stat = os.stat(file_path) recent_files.append({ "File": file, "Path": file_path.replace(DATA_DIR.rstrip('/') + '/', ""), "Size": f"{stat.st_size / (1024**2):.2f} MB", "Modified": datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M") }) except Exception: continue # Sort by modification time and take most recent 20 recent_files.sort(key=lambda x: x["Modified"], reverse=True) return recent_files[:20] except Exception as e: logger.error(f"Error getting recent files: {e}") return [{"Error": str(e)}] def get_logs(): """Get recent log entries""" try: log_files = [ "/data/logs/scheduler.log", "/data/logs/data_pipeline.log", "/data/logs/monitor.log" ] logs = [] for log_file in log_files: if os.path.exists(log_file): try: with open(log_file, 'r', encoding='utf-8') as f: lines = f.readlines() # Get last 10 lines recent_lines = lines[-10:] if len(lines) > 10 else lines logs.append(f"=== {os.path.basename(log_file)} ===\n") logs.extend(recent_lines) logs.append("\n") except Exception as e: logs.append(f"Error reading {log_file}: {str(e)}\n") return "".join(logs) if logs else "No log files found" except Exception as e: logger.error(f"Error getting logs: {e}") return f"Error getting logs: {str(e)}" # Create Gradio interface with gr.Blocks(title="AdvisorAI Data Pipeline Monitor", theme=gr.themes.Soft()) as app: gr.Markdown("# 🤖 AdvisorAI Data Pipeline Monitor") gr.Markdown("Real-time monitoring of the AdvisorAI data collection and processing pipeline") with gr.Tabs(): with gr.TabItem("📊 Dashboard"): with gr.Row(): with gr.Column(): gr.Markdown("### Health Status") health_display = gr.JSON(label="System Health & Status") with gr.Column(): gr.Markdown("### Pipeline Status") pipeline_display = gr.JSON(label="Data Pipeline Status") with gr.Row(): refresh_btn = gr.Button("🔄 Refresh", variant="primary") with gr.TabItem("📁 Recent Files"): gr.Markdown("### Recently Modified Data Files") files_display = gr.Dataframe( headers=["File", "Path", "Size", "Modified"], datatype=["str", "str", "str", "str"], label="Recent Files" ) refresh_files_btn = gr.Button("🔄 Refresh Files") with gr.TabItem("📝 Logs"): gr.Markdown("### Recent Log Entries") logs_display = gr.Textbox( label="Recent Logs", lines=20, max_lines=30, show_copy_button=True ) refresh_logs_btn = gr.Button("🔄 Refresh Logs") # Event handlers def refresh_dashboard(): health = get_health_status() pipeline = get_pipeline_status() # JSON components accept dicts directly return health, pipeline def refresh_files(): files = get_recent_files() if not files: return [] if isinstance(files, list) and isinstance(files[0], dict) and "Error" not in files[0]: rows = [] for f in files: rows.append([f.get("File",""), f.get("Path",""), f.get("Size",""), f.get("Modified","")]) return rows return [["Error", str(files), "", ""]] def refresh_logs(): return get_logs() # Connect event handlers refresh_btn.click( refresh_dashboard, outputs=[health_display, pipeline_display] ) refresh_files_btn.click( refresh_files, outputs=[files_display] ) refresh_logs_btn.click( refresh_logs, outputs=[logs_display] ) # Auto-refresh on load app.load( refresh_dashboard, outputs=[health_display, pipeline_display] ) app.load( refresh_files, outputs=[files_display] ) if __name__ == "__main__": logger.info("Starting Gradio app...") port = int(os.environ.get("PORT", "7860")) app.launch( server_name="0.0.0.0", server_port=port, share=False, show_error=True, quiet=False )