Maaroufabousaleh
Refactor Docker and entrypoint configurations; update NLTK data handling and logging paths
4b5719e
import gradio as gr | |
import json | |
import os | |
import sys | |
import logging | |
import pandas as pd | |
import time | |
from datetime import datetime, timedelta | |
import psutil | |
from pathlib import Path | |
# Internal config for paths and markers | |
try: | |
from src.config import DATA_DIR, LOG_DIR, LAST_RUN_PATH | |
except Exception: | |
# Fallbacks if import path differs in Spaces | |
try: | |
from config import DATA_DIR, LOG_DIR, LAST_RUN_PATH # type: ignore | |
except Exception: | |
DATA_DIR = os.environ.get('DATA_DIR', '/data') | |
LOG_DIR = os.environ.get('LOG_DIR', os.path.join(DATA_DIR, 'logs')) | |
LAST_RUN_PATH = os.environ.get('LAST_RUN_PATH', '/tmp/last_run.txt') | |
# Add src to Python path for imports | |
sys.path.insert(0, '/app/src') | |
sys.path.insert(0, '/app') | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[logging.StreamHandler(sys.stdout)] | |
) | |
logger = logging.getLogger(__name__) | |
def get_health_status(): | |
"""Get basic health status""" | |
try: | |
# Get process info | |
process = psutil.Process() | |
memory_mb = process.memory_info().rss / 1024 / 1024 | |
cpu_percent = process.cpu_percent() | |
# Get system info | |
memory = psutil.virtual_memory() | |
disk = psutil.disk_usage('/') | |
# Check scheduler status | |
scheduler_running = False | |
last_run_time = "Unknown" | |
try: | |
last_run_file = LAST_RUN_PATH | |
if os.path.exists(last_run_file): | |
with open(last_run_file, 'r') as f: | |
last_run_str = f.read().strip() | |
last_run = datetime.strptime(last_run_str, '%Y-%m-%d %H:%M:%S') | |
time_since_last_run = (datetime.now() - last_run).total_seconds() | |
scheduler_running = time_since_last_run < 2700 # 45 minutes | |
last_run_time = last_run_str | |
except Exception as e: | |
logger.warning(f"Could not check scheduler status: {e}") | |
return { | |
"status": "healthy" if memory_mb < 400 else "warning", | |
"timestamp": datetime.now().isoformat(), | |
"process_memory_mb": round(memory_mb, 2), | |
"process_cpu_percent": round(cpu_percent, 2), | |
"system_memory_percent": round(memory.percent, 1), | |
"system_memory_available_gb": round(memory.available / (1024**3), 2), | |
"disk_free_gb": round(disk.free / (1024**3), 2), | |
"scheduler_running": scheduler_running, | |
"scheduler_last_run": last_run_time | |
} | |
except Exception as e: | |
logger.error(f"Health check failed: {e}") | |
return { | |
"status": "error", | |
"error": str(e), | |
"timestamp": datetime.now().isoformat() | |
} | |
def get_pipeline_status(): | |
"""Get data pipeline status""" | |
try: | |
data_dirs = [ | |
os.path.join(DATA_DIR, 'merged', 'features'), | |
os.path.join(DATA_DIR, 'merged', 'train'), | |
os.path.join(DATA_DIR, 'alpaca'), | |
os.path.join(DATA_DIR, 'advisorai-data'), | |
] | |
recent_files = 0 | |
total_size = 0 | |
for data_dir in data_dirs: | |
if os.path.exists(data_dir): | |
for root, dirs, files in os.walk(data_dir): | |
for file in files: | |
if file.endswith(('.json', '.parquet', '.csv')): | |
file_path = os.path.join(root, file) | |
try: | |
stat = os.stat(file_path) | |
# Count files modified in last 24 hours | |
if time.time() - stat.st_mtime < 86400: | |
recent_files += 1 | |
total_size += stat.st_size | |
except Exception: | |
continue | |
return { | |
"status": "running" if recent_files > 0 else "stale", | |
"recent_files_24h": recent_files, | |
"total_data_size_gb": round(total_size / (1024**3), 2), | |
"last_check": datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Pipeline status check failed: {e}") | |
return { | |
"status": "error", | |
"error": str(e), | |
"last_check": datetime.now().isoformat() | |
} | |
def get_recent_files(): | |
"""Get list of recent files in the data directories""" | |
try: | |
base_paths = [ | |
os.path.join(DATA_DIR, 'merged', 'features'), | |
os.path.join(DATA_DIR, 'merged', 'train'), | |
os.path.join(DATA_DIR, 'alpaca'), | |
os.path.join(DATA_DIR, 'advisorai-data', 'features'), | |
] | |
recent_files = [] | |
for base_path in base_paths: | |
if os.path.exists(base_path): | |
for root, dirs, files in os.walk(base_path): | |
for file in files[:10]: # Limit to 10 files per directory | |
file_path = os.path.join(root, file) | |
try: | |
stat = os.stat(file_path) | |
recent_files.append({ | |
"File": file, | |
"Path": file_path.replace(DATA_DIR.rstrip('/') + '/', ""), | |
"Size": f"{stat.st_size / (1024**2):.2f} MB", | |
"Modified": datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M") | |
}) | |
except Exception: | |
continue | |
# Sort by modification time and take most recent 20 | |
recent_files.sort(key=lambda x: x["Modified"], reverse=True) | |
return recent_files[:20] | |
except Exception as e: | |
logger.error(f"Error getting recent files: {e}") | |
return [{"Error": str(e)}] | |
def get_logs(): | |
"""Get recent log entries""" | |
try: | |
log_files = [ | |
"/data/logs/scheduler.log", | |
"/data/logs/data_pipeline.log", | |
"/data/logs/monitor.log" | |
] | |
logs = [] | |
for log_file in log_files: | |
if os.path.exists(log_file): | |
try: | |
with open(log_file, 'r', encoding='utf-8') as f: | |
lines = f.readlines() | |
# Get last 10 lines | |
recent_lines = lines[-10:] if len(lines) > 10 else lines | |
logs.append(f"=== {os.path.basename(log_file)} ===\n") | |
logs.extend(recent_lines) | |
logs.append("\n") | |
except Exception as e: | |
logs.append(f"Error reading {log_file}: {str(e)}\n") | |
return "".join(logs) if logs else "No log files found" | |
except Exception as e: | |
logger.error(f"Error getting logs: {e}") | |
return f"Error getting logs: {str(e)}" | |
# Create Gradio interface | |
with gr.Blocks(title="AdvisorAI Data Pipeline Monitor", theme=gr.themes.Soft()) as app: | |
gr.Markdown("# π€ AdvisorAI Data Pipeline Monitor") | |
gr.Markdown("Real-time monitoring of the AdvisorAI data collection and processing pipeline") | |
with gr.Tabs(): | |
with gr.TabItem("π Dashboard"): | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### Health Status") | |
health_display = gr.JSON(label="System Health & Status") | |
with gr.Column(): | |
gr.Markdown("### Pipeline Status") | |
pipeline_display = gr.JSON(label="Data Pipeline Status") | |
with gr.Row(): | |
refresh_btn = gr.Button("π Refresh", variant="primary") | |
with gr.TabItem("π Recent Files"): | |
gr.Markdown("### Recently Modified Data Files") | |
files_display = gr.Dataframe( | |
headers=["File", "Path", "Size", "Modified"], | |
datatype=["str", "str", "str", "str"], | |
label="Recent Files" | |
) | |
refresh_files_btn = gr.Button("π Refresh Files") | |
with gr.TabItem("π Logs"): | |
gr.Markdown("### Recent Log Entries") | |
logs_display = gr.Textbox( | |
label="Recent Logs", | |
lines=20, | |
max_lines=30, | |
show_copy_button=True | |
) | |
refresh_logs_btn = gr.Button("π Refresh Logs") | |
# Event handlers | |
def refresh_dashboard(): | |
health = get_health_status() | |
pipeline = get_pipeline_status() | |
# JSON components accept dicts directly | |
return health, pipeline | |
def refresh_files(): | |
files = get_recent_files() | |
if not files: | |
return [] | |
if isinstance(files, list) and isinstance(files[0], dict) and "Error" not in files[0]: | |
rows = [] | |
for f in files: | |
rows.append([f.get("File",""), f.get("Path",""), f.get("Size",""), f.get("Modified","")]) | |
return rows | |
return [["Error", str(files), "", ""]] | |
def refresh_logs(): | |
return get_logs() | |
# Connect event handlers | |
refresh_btn.click( | |
refresh_dashboard, | |
outputs=[health_display, pipeline_display] | |
) | |
refresh_files_btn.click( | |
refresh_files, | |
outputs=[files_display] | |
) | |
refresh_logs_btn.click( | |
refresh_logs, | |
outputs=[logs_display] | |
) | |
# Auto-refresh on load | |
app.load( | |
refresh_dashboard, | |
outputs=[health_display, pipeline_display] | |
) | |
app.load( | |
refresh_files, | |
outputs=[files_display] | |
) | |
if __name__ == "__main__": | |
logger.info("Starting Gradio app...") | |
port = int(os.environ.get("PORT", "7860")) | |
app.launch( | |
server_name="0.0.0.0", | |
server_port=port, | |
share=False, | |
show_error=True, | |
quiet=False | |
) | |