Spaces:
Running
Running
import api_client | |
import json | |
from datetime import datetime, timedelta | |
import hashlib | |
import psycopg2 | |
import psycopg2.extras | |
import os | |
from dotenv import load_dotenv | |
from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
import requests | |
from apscheduler.schedulers.blocking import BlockingScheduler | |
# Load environment variables from .env file | |
load_dotenv(override=True) | |
def connect_to_db(): | |
""" | |
Connect to the PostgreSQL database using environment variables. | |
Returns a connection object. | |
""" | |
db_password = os.getenv("DB_PASSWORD") | |
if not db_password: | |
raise ValueError( | |
"Database password not found in environment variables. Please set DB_PASSWORD." | |
) | |
# Get database connection details from environment variables with defaults | |
db_host = os.getenv("DB_HOST") | |
db_port = int(os.getenv("DB_PORT")) | |
db_name = os.getenv("DB_NAME") | |
db_user = os.getenv("DB_USER") | |
return psycopg2.connect( | |
host=db_host, | |
port=db_port, | |
database=db_name, | |
user=db_user, | |
password=db_password, | |
cursor_factory=psycopg2.extras.DictCursor, | |
) | |
def verify_mcp_api_key(api_key): | |
""" | |
Verify the MCP API key with the key generation server. | |
Parameters: | |
- api_key: The MCP API key to verify | |
Returns: | |
- Dictionary with success status and message | |
""" | |
try: | |
# Get the key server URL from environment or use default | |
key_server_url = os.getenv("KEY_SERVER_URL") | |
response = requests.post( | |
f"{key_server_url}/api/verifyKey", | |
json={"apiKey": api_key}, | |
headers={"Content-Type": "application/json"}, | |
timeout=10, | |
) | |
if response.status_code == 200: | |
data = response.json() | |
if data.get("valid"): | |
return {"success": True, "message": "API key is valid"} | |
else: | |
return {"success": False, "message": "API key is invalid"} | |
else: | |
return { | |
"success": False, | |
"message": f"Key verification failed with status {response.status_code}", | |
} | |
except requests.exceptions.RequestException as e: | |
return { | |
"success": False, | |
"message": f"Failed to connect to key verification service: {str(e)}", | |
} | |
except Exception as e: | |
return {"success": False, "message": f"Key verification error: {str(e)}"} | |
def validate_api_configuration( | |
mcp_api_key, | |
name, | |
description, | |
method, | |
base_url, | |
endpoint, | |
param_keys_values, | |
header_keys_values, | |
additional_params, | |
schedule_interval_minutes, | |
stop_after_hours, | |
start_at, # IMPORTANT: Use empty string "" for immediate start (most common case) | |
): | |
""" | |
TOOL: Validate and store API configuration for monitoring. | |
PURPOSE: Test an API endpoint and store the configuration if successful. This is STEP 1 | |
of the monitoring setup process. If validation fails, retry with corrected parameters. If successful, use the returned config_id in activate_monitoring() function. | |
CRITICAL: Even if success=True, you MUST manually check the 'sample_response' field | |
before proceeding to activate_monitoring(). The API call may return success=True but contain | |
error messages (like "401 Unauthorized", "Invalid API key", etc.) in the sample_response. | |
CRITICAL: Always try to add parameters that will limit the API response to a manageable size. | |
CRITICAL: Be sure to always clearly inform the user of the config_id after a desired validation result. | |
CRITICAL: If you don't have an MCP API KEY given by the user, prompt them to get it here: https://mcp-hackathon.vercel.app/ | |
WORKFLOW: | |
1. Call this function to validate API configuration | |
2. If success=False: Fix parameters and retry this function | |
3. If success=True: MANUALLY INSPECT the 'sample_response' field for errors | |
4. If sample_response contains error messages: Fix API parameters and retry validation | |
5. If sample_response looks valid: Use config_id in activate_monitoring() to activate monitoring | |
ARGUMENTS: | |
- mcp_api_key: MCP API key serves as user identifier. | |
- name: User-friendly name for the monitoring task | |
- description: Description of what is being monitored | |
- method: HTTP method (GET, POST, PUT, DELETE) | |
- base_url: The base URL of the API | |
- endpoint: The specific API endpoint | |
- param_keys_values: Parameter key-value pairs, one per line | |
- header_keys_values: Header key-value pairs, one per line | |
- additional_params: Optional JSON string for complex parameters | |
- schedule_interval_minutes: Minutes between calls | |
- stop_after_hours: Hours after which to stop (supports decimals, max 168 = 1 week) | |
- start_at: Optional datetime string for when to start the monitoring. | |
IMPORTANT: Leave as empty string "" for immediate start (most common use case, always default to this if no start time provided). Only provide a datetime string (e.g., "2024-06-15 09:00:00") if you need to schedule monitoring for a specific future time. | |
Input Examples: | |
1. Simple GET request to monitor stock price: | |
mcp_api_key: "your_mcp_key_here" | |
name: "NVDA Stock Price" | |
description: "Monitor NVIDIA stock price every 30 minutes" | |
method: "GET" | |
base_url: "https://api.example.com" | |
endpoint: "stocks/NVDA" | |
param_keys_values: "symbol: NVDA\ninterval: 1min" | |
header_keys_values: "Authorization: Bearer your_token" | |
additional_params: "{}" | |
schedule_interval_minutes: 30 | |
stop_after_hours: 1.5 | |
start_at: "" | |
2. Weather monitoring with free API: | |
mcp_api_key: "" | |
name: "Weather Monitor" | |
description: "Monitor current weather conditions every 2 hours for one week using Open-Meteo free API" | |
method: "GET" | |
base_url: "https://api.open-meteo.com" | |
endpoint: "v1/forecast" | |
param_keys_values: "latitude: 40.7128\nlongitude: -74.0060\ncurrent: temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m\ntimezone: America/New_York" | |
header_keys_values: "Content-Type: application/json" | |
additional_params: "{}" | |
schedule_interval_minutes: 120 | |
stop_after_hours: 168 | |
start_at: "2024-06-15 09:00:00" | |
Returns: | |
- Dictionary with success status, config_id (needed for setup_scheduler), message, and sample_response | |
Example return: | |
{ | |
"success": True, | |
"config_id": 123, | |
"message": "API call tested and stored successfully", | |
"sample_response": {...}, | |
"stop_at": "2025-06-11T12:00:00Z", | |
"start_at": "2025-06-04T12:00:00Z" | |
} | |
NEXT STEP: If success=True, call activate_monitoring(config_id, mcp_api_key) to activate monitoring | |
""" | |
try: | |
# Validate input parameters | |
if not mcp_api_key or not mcp_api_key.strip() or mcp_api_key == "": | |
mcp_api_key = os.getenv("MCP_API_KEY", "") | |
if not mcp_api_key or not mcp_api_key.strip(): | |
return { | |
"success": False, | |
"message": "MCP API key is required", | |
"config_id": None, | |
} | |
# Verify the MCP API key with the key generation server | |
key_verification = verify_mcp_api_key(mcp_api_key) | |
if not key_verification["success"]: | |
return { | |
"success": False, | |
"message": f"API key verification failed: {key_verification['message']}", | |
"config_id": None, | |
} | |
# Validate required parameters | |
if not name or not name.strip(): | |
return { | |
"success": False, | |
"message": "Monitoring name is required", | |
"config_id": None, | |
} | |
if not base_url or not base_url.strip(): | |
return { | |
"success": False, | |
"message": "Base URL is required", | |
"config_id": None, | |
} | |
if not method or method not in ["GET", "POST", "PUT", "DELETE"]: | |
return { | |
"success": False, | |
"message": "Valid HTTP method is required (GET, POST, PUT, DELETE)", | |
"config_id": None, | |
} | |
if ( | |
not isinstance(schedule_interval_minutes, (int, float)) | |
or schedule_interval_minutes <= 0 | |
or schedule_interval_minutes > 1440 | |
): | |
return { | |
"success": False, | |
"message": "Schedule interval must be between 0 and 1440 minutes", | |
"config_id": None, | |
} | |
if ( | |
not isinstance(stop_after_hours, (int, float)) | |
or stop_after_hours < 0.1 | |
or stop_after_hours > 168 | |
): | |
return { | |
"success": False, | |
"message": "Stop after hours must be between 0.1 and 168 hours (1 week max)", | |
"config_id": None, | |
} | |
# Validate start_at if provided | |
if start_at: | |
try: | |
parsed_start_time = datetime.fromisoformat( | |
start_at.replace("Z", "+00:00") | |
) | |
if parsed_start_time < datetime.now(): | |
return { | |
"success": False, | |
"message": "Start time cannot be in the past", | |
"config_id": None, | |
} | |
except ValueError: | |
return { | |
"success": False, | |
"message": "Invalid start time format", | |
"config_id": None, | |
} | |
else: | |
parsed_start_time = datetime.now() # Test the API call | |
result = api_client.call_api( | |
method=method, | |
base_url=base_url, | |
endpoint=endpoint, | |
param_keys_values=param_keys_values, | |
header_keys_values=header_keys_values, | |
additional_params=additional_params, | |
) | |
# Check if the API call failed | |
if isinstance(result, str) and result.startswith("Error"): | |
return { | |
"success": False, | |
"message": f"API call test failed: {result}", | |
"config_id": None, | |
} | |
# Generate unique config ID and calculate timestamps | |
config_str = ( | |
f"{mcp_api_key}_{name}_{base_url}_{endpoint}_{datetime.now().timestamp()}" | |
) | |
config_id = int(hashlib.md5(config_str.encode()).hexdigest()[:7], 16) | |
# Calculate timestamps | |
created_at = datetime.now() | |
stop_at = parsed_start_time + timedelta(hours=float(stop_after_hours)) | |
# Store configuration | |
try: | |
conn = connect_to_db() | |
cur = conn.cursor() | |
cur.execute( | |
""" | |
INSERT INTO api_configurations ( | |
config_id, mcp_api_key, name, description, method, | |
base_url, endpoint, params, headers, additional_params, | |
is_active, schedule_interval_minutes, start_at, stop_at, created_at | |
) VALUES ( | |
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, | |
%s, %s, %s, %s, %s | |
) | |
""", | |
( | |
config_id, | |
mcp_api_key, | |
name, | |
description, | |
method, | |
base_url, | |
endpoint, | |
json.dumps(api_client.parse_key_value_string(param_keys_values)), | |
json.dumps(api_client.parse_key_value_string(header_keys_values)), | |
additional_params, | |
False, | |
float(schedule_interval_minutes), | |
parsed_start_time, | |
stop_at.isoformat(), | |
created_at, | |
), | |
) | |
conn.commit() | |
cur.execute("SELECT * FROM api_configurations WHERE id = %s", (config_id,)) | |
rows = cur.fetchall() | |
for row in rows: | |
print(row) | |
conn.close() | |
cur.close() | |
except Exception as db_error: | |
return { | |
"success": False, | |
"message": f"Database error: {str(db_error)}", | |
"config_id": None, | |
} | |
# Return success response | |
return { | |
"success": True, | |
"config_id": config_id, | |
"message": f"API call tested, validated, and stored successfully for '{name}'. Make sure to review the message manually before activating monitoring. Use this config_id in activate_monitoring() to activate monitoring.", | |
"sample_response": ( | |
json.loads(result) | |
if result.startswith("{") or result.startswith("[") | |
else result | |
), | |
"start_at": parsed_start_time.isoformat(), | |
"stop_at": stop_at.isoformat(), | |
"schedule_interval_minutes": schedule_interval_minutes, | |
} | |
except Exception as e: | |
return { | |
"success": False, | |
"message": f"Validation failed with error: {str(e)}", | |
"config_id": None, | |
} | |
async def activate_monitoring(config_id, mcp_api_key): | |
""" | |
TOOL: Activate periodic monitoring for a validated API configuration. | |
PURPOSE: Start automated recurring API calls based on a previously validated configuration. | |
This is STEP 2 of the monitoring setup process. | |
PREREQUISITE: Must call validate_api_configuration() first and obtain a config_id from successful validation. Make sure that the sample_response is what you expect | |
to see before proceeding with this function. | |
WORKFLOW: | |
1. First call validate_api_configuration() to get config_id | |
2. If validation successful, call this function with the config_id | |
3. Monitoring will run automatically according to the validated schedule | |
ARGUMENTS: | |
- config_id: The ID from successful validate_api_configuration() execution (required) | |
- mcp_api_key: User's MCP API key for verification (must match validation step). | |
Input Examples: | |
1. Activate scheduler for stock monitoring: | |
config_id: 123456789 | |
mcp_api_key: "your_mcp_key_here" | |
2. Activate scheduler for weather alerts: | |
config_id: 987654321 | |
mcp_api_key: "your_mcp_key_here" | |
NOTE: The config_id must be obtained from a successful validate_api_configuration() response. | |
The mcp_api_key must match the one used during validation. | |
Returns: | |
- Dictionary with success status and scheduling details | |
Example return: | |
{ | |
"success": True, | |
"message": "Scheduler activated for 'NVDA Stock Price'", | |
"config_id": 123, | |
"schedule_interval_minutes": 20, | |
"stop_at": "2025-06-11T12:00:00Z", | |
"next_call_at": "2025-06-04T12:20:00Z" | |
} | |
ERROR HANDLING: If config_id not found or invalid, returns success=False with error message | |
""" | |
# need to extract | |
""" | |
mcp_api_key, | |
name, | |
description, | |
method, | |
base_url, | |
endpoint, | |
param_keys_values, | |
header_keys_values, | |
additional_params, | |
schedule_interval_minutes, | |
stop_after_hours, | |
time_to_start, | |
this | |
""" | |
# Attempt to create the scheduler | |
try: | |
if not mcp_api_key or not mcp_api_key.strip() or mcp_api_key == "": | |
mcp_api_key = os.getenv("MCP_API_KEY", "") | |
if not mcp_api_key or not mcp_api_key.strip(): | |
return { | |
"success": False, | |
"message": "MCP API key is required", | |
"config_id": None, | |
} | |
# Verify the MCP API key with the key generation server first | |
key_verification = verify_mcp_api_key(mcp_api_key) | |
if not key_verification["success"]: | |
return { | |
"success": False, | |
"message": f"API key verification failed: {key_verification['message']}", | |
"config_id": config_id, | |
} | |
conn = connect_to_db() | |
cur = conn.cursor() | |
cur.execute( | |
"SELECT * FROM api_configurations WHERE config_id = %s", (config_id,) | |
) | |
config_row = cur.fetchone() | |
if not config_row: | |
conn.close() | |
return { | |
"success": False, | |
"message": "Invalid config_id", | |
"config_id": config_id, | |
} | |
config = dict(config_row) | |
if config["mcp_api_key"] != mcp_api_key: | |
conn.close() | |
return { | |
"success": False, | |
"message": "Invalid mcp_api_key. You are not authorized to activate this configuration.", | |
"config_id": config_id, | |
} | |
# Extract scheduling parameters | |
name = config.get("name", "Unknown") | |
schedule_interval_minutes = float(config.get("schedule_interval_minutes", 20)) | |
stop_at = config.get("stop_at") | |
start_at = config.get("start_at") | |
if not start_at: | |
start_at = datetime.now() | |
else: | |
if not isinstance(start_at, datetime): | |
start_at = datetime.fromisoformat(str(start_at)) | |
if not stop_at: | |
stop_at = start_at + timedelta(hours=config.get("stop_after_hours", 24)) | |
else: | |
if not isinstance(stop_at, datetime): | |
stop_at = datetime.fromisoformat( | |
str(stop_at) | |
) | |
# Job function to make actual API calls | |
def api_monitoring_job(): | |
now = datetime.now() | |
next_call = now + timedelta(minutes=schedule_interval_minutes) | |
print( | |
f"Executing API monitoring job for {name} at {now.isoformat()}. Next call at {next_call.isoformat()}" | |
) | |
try: | |
# Extract API configuration parameters | |
method = config.get("method", "GET") | |
base_url = config.get("base_url") | |
endpoint = config.get("endpoint", "") | |
params = config.get("params", {}) | |
headers = config.get("headers", {}) | |
additional_params = config.get("additional_params", {}) | |
# Convert JSON strings back to dicts if needed | |
if isinstance(params, str): | |
params = json.loads(params) if params else {} | |
if isinstance(headers, str): | |
headers = json.loads(headers) if headers else {} | |
if isinstance(additional_params, str): | |
additional_params = ( | |
json.loads(additional_params) if additional_params else {} | |
) | |
# Convert params and headers back to key-value string format for api_client | |
param_keys_values = ( | |
"\n".join([f"{k}: {v}" for k, v in params.items()]) | |
if params | |
else "" | |
) | |
header_keys_values = ( | |
"\n".join([f"{k}: {v}" for k, v in headers.items()]) | |
if headers | |
else "" | |
) | |
additional_params_str = ( | |
json.dumps(additional_params) if additional_params else "{}" | |
) | |
# Make the actual API call | |
api_result = api_client.call_api( | |
method=method, | |
base_url=base_url, | |
endpoint=endpoint, | |
param_keys_values=param_keys_values, | |
header_keys_values=header_keys_values, | |
additional_params=additional_params_str, | |
) | |
# Determine if the call was successful | |
is_successful = not ( | |
isinstance(api_result, str) and api_result.startswith("Error") | |
) | |
error_message = api_result if not is_successful else None | |
response_data = api_result if is_successful else None | |
# Convert response to JSON if it's a string representation | |
if is_successful and isinstance(response_data, str): | |
try: | |
if response_data.startswith("{") or response_data.startswith( | |
"[" | |
): | |
response_data = json.loads(response_data) | |
except json.JSONDecodeError: | |
# Keep as string if not valid JSON | |
pass | |
job_conn = connect_to_db() | |
job_cur = job_conn.cursor() | |
# Mark config as active (only once, on first run) | |
if not config["is_active"]: | |
job_cur.execute( | |
""" | |
UPDATE api_configurations SET is_active = %s WHERE config_id = %s | |
""", | |
(True, config_id), | |
) | |
job_conn.commit() | |
print(f"Marked configuration {config_id} as active.") | |
# Check if this is the last call by comparing current time to stop_at | |
current_time = datetime.now() | |
next_call_time = current_time + timedelta( | |
minutes=schedule_interval_minutes | |
) | |
if next_call_time >= stop_at: | |
# This is the last call, mark as inactive | |
job_cur.execute( | |
""" | |
UPDATE api_configurations SET is_active = %s WHERE config_id = %s | |
""", | |
(False, config_id), | |
) | |
job_conn.commit() | |
print( | |
f"Last call for configuration {config_id}. Marked as inactive." | |
) | |
# Insert the actual API call result | |
job_cur.execute( | |
""" | |
INSERT INTO api_call_results ( | |
config_id, response_data, is_successful, error_message, called_at | |
) VALUES (%s, %s, %s, %s, %s) | |
""", | |
( | |
config_id, | |
( | |
json.dumps(response_data) | |
if response_data is not None | |
else None | |
), | |
is_successful, | |
error_message, | |
now, | |
), | |
) | |
job_conn.commit() | |
job_cur.close() | |
job_conn.close() | |
print( | |
f"API call result for {name}: {'Success' if is_successful else 'Failed'}" | |
) | |
if not is_successful: | |
print(f"Error: {error_message}") | |
except Exception as job_exc: | |
print(f"API monitoring job error for {name}: {job_exc}") | |
try: | |
job_conn = connect_to_db() | |
job_cur = job_conn.cursor() | |
job_cur.execute( | |
""" | |
INSERT INTO api_call_results ( | |
config_id, response_data, is_successful, error_message, called_at | |
) VALUES (%s, %s, %s, %s, %s) | |
""", | |
( | |
config_id, | |
None, | |
False, | |
f"Job execution error: {str(job_exc)}", | |
now, | |
), | |
) | |
job_conn.commit() | |
job_cur.close() | |
job_conn.close() | |
except Exception as db_exc: | |
print( | |
f"Failed to log error to database: {db_exc}" | |
) | |
# Setup AsyncIO scheduler | |
scheduler = AsyncIOScheduler() | |
# Schedule the API monitoring job | |
scheduler.add_job( | |
api_monitoring_job, | |
"interval", | |
minutes=schedule_interval_minutes, | |
start_date=start_at, | |
end_date=stop_at, | |
id=f"monitor_{config_id}", | |
) | |
scheduler.start() | |
# Mark config as active (only once, on first run) | |
if not config["is_active"]: | |
cur.execute( | |
""" | |
UPDATE api_configurations SET is_active = %s WHERE config_id = %s | |
""", | |
(True, config_id), | |
) | |
conn.commit() | |
print(f"Marked configuration {config_id} as active.") | |
conn.close() | |
return { | |
"success": True, | |
"message": f"Scheduler activated for '{name}'", | |
"config_id": config_id, | |
"schedule_interval_minutes": schedule_interval_minutes, | |
"stop_at": stop_at.isoformat(), | |
"next_call_at": ( | |
start_at + timedelta(minutes=schedule_interval_minutes) | |
).isoformat(), | |
} | |
except Exception as e: | |
return { | |
"success": False, | |
"message": f"Failed to create scheduler: {str(e)}", | |
"config_id": config_id, | |
} | |
def retrieve_monitored_data(config_id, mcp_api_key, mode="summary"): | |
""" | |
TOOL: Retrieve monitored data for a specific API configuration. | |
PURPOSE: Fetch the latest monitored data for a given configuration ID. | |
This is STEP 3 of the monitoring setup process. | |
PREREQUISITE: Must call validate_api_configuration() first and obtain a config_id from successful validation, then activate_monitoring() to start monitoring. | |
This function can be called at any time after monitoring activation to retrieve the latest data collected by the monitoring system. | |
ARGUMENTS: | |
- config_id: The ID of the API configuration to retrieve data for (required) | |
- mcp_api_key: User's MCP API key for verification (must match validation step). | |
- mode: Data return mode - "summary" (LLM-optimized), "details" (full responses, minimal metadata), "full" (everything) | |
Input Examples: | |
1. Retrieve data for stock monitoring: | |
config_id: 123456789 | |
mcp_api_key: "your_mcp_key_here" | |
2. Retrieve data for weather alerts: | |
config_id: 987654321 | |
mcp_api_key: "your_mcp_key_here" | |
Returns: | |
- Dictionary with monitoring status in one of three formats based on mode parameter | |
SUMMARY mode (LLM-optimized, default): | |
{ | |
"success": True, | |
"config_name": "Weather Alert Monitor", | |
"summary": { | |
"status": "active", // "active", "inactive" | |
"health": "good", // "good", "degraded", "no_data" | |
"calls_made": 15, | |
"success_rate": 93.3, | |
"last_call": "2025-06-05T15:20:00", | |
"last_success": "2025-06-05T15:20:00" | |
}, | |
"recent_calls": [ | |
{ | |
"timestamp": "2025-06-05T15:20:00", | |
"success": true, | |
"error": null, | |
"response_preview": "{'alerts': [{'type': 'tornado'}]}..." // truncated to 150 characters | |
} | |
// ... up to 5 most recent calls | |
], | |
"full_data_available": 15, | |
"monitoring_details": { | |
"interval_minutes": 20, | |
"is_finished": false | |
} | |
} | |
DETAILS mode (full responses, minimal metadata): | |
{ | |
"success": True, | |
"config_name": "Weather Alert Monitor", | |
"status": "active", | |
"calls_made": 15, | |
"success_rate": 93.3, | |
"recent_responses": [ | |
{ | |
"timestamp": "2025-06-05T15:20:00", | |
"success": true, | |
"response_data": {...}, // full response data | |
"error": null | |
} | |
// ... up to 10 most recent calls with full responses | |
] | |
} | |
FULL mode (everything): | |
{ | |
"success": True, | |
"config_name": "Weather Alert Monitor", | |
"config_description": "Monitor severe weather alerts", | |
"is_active": True, | |
"is_finished": False, | |
"progress": {...}, | |
"schedule_info": {...}, | |
"data": [...] // all historical data | |
} | |
Error return format: | |
{ | |
"success": False, | |
"message": "Invalid config_id or mcp_api_key" | |
} | |
ERROR HANDLING: If config_id not found or invalid, returns success=False with error message | |
""" | |
try: | |
if not mcp_api_key or not mcp_api_key.strip() or mcp_api_key == "": | |
mcp_api_key = os.getenv("MCP_API_KEY", "") | |
if not mcp_api_key or not mcp_api_key.strip(): | |
return { | |
"success": False, | |
"message": "MCP API key is required", | |
"config_id": None, | |
} | |
# Verify the MCP API key with the key generation server first | |
key_verification = verify_mcp_api_key(mcp_api_key) | |
if not key_verification["success"]: | |
return { | |
"success": False, | |
"message": f"API key verification failed: {key_verification['message']}", | |
"data": [], | |
} | |
conn = connect_to_db() | |
cur = conn.cursor() | |
cur.execute( | |
"SELECT * FROM api_configurations WHERE config_id = %s", (config_id,) | |
) | |
config_row = cur.fetchone() | |
if not config_row: | |
conn.close() | |
return { | |
"success": False, | |
"message": "Invalid config_id", | |
"data": [], | |
} | |
config = dict(config_row) | |
print(f"Retrieved config: {config}") | |
if config["mcp_api_key"] != mcp_api_key: | |
conn.close() | |
return { | |
"success": False, | |
"message": "Invalid mcp_api_key. You are not authorized to access this configuration.", | |
"data": [], | |
} | |
# Query the api_call_results table for monitored data | |
cur.execute( | |
"SELECT * FROM api_call_results WHERE config_id = %s ORDER BY called_at DESC", | |
(config_id,), | |
) | |
monitored_data_rows = cur.fetchall() | |
# Convert rows to dictionaries and format timestamps | |
monitored_data = [] | |
for row in monitored_data_rows: | |
row_dict = dict(row) | |
# Format the timestamp for better readability | |
if row_dict.get("called_at"): | |
row_dict["called_at"] = row_dict["called_at"].isoformat() | |
monitored_data.append(row_dict) | |
# Check if monitoring is finished | |
now = datetime.now() | |
stop_at_time = config.get("stop_at") | |
if stop_at_time: | |
if hasattr(stop_at_time, "replace"): | |
stop_at = stop_at_time | |
else: | |
stop_at = datetime.fromisoformat( | |
str(stop_at_time).replace("Z", "+00:00") | |
) | |
is_finished = now > stop_at | |
else: | |
is_finished = False | |
# Calculate progress statistics | |
total_expected_calls = 0 | |
if config.get("start_at") and config.get("schedule_interval_minutes"): | |
start_time = config["start_at"] | |
if hasattr(start_time, "replace"): | |
start_dt = start_time | |
else: | |
start_dt = datetime.fromisoformat(str(start_time)) | |
elapsed_minutes = (now - start_dt).total_seconds() / 60 | |
if elapsed_minutes > 0: | |
total_expected_calls = max( | |
1, int(elapsed_minutes / float(config["schedule_interval_minutes"])) | |
) | |
# Get success/failure counts | |
successful_calls = len( | |
[d for d in monitored_data if d.get("is_successful", False)] | |
) | |
failed_calls = len( | |
[d for d in monitored_data if not d.get("is_successful", True)] | |
) | |
total_calls = len( | |
monitored_data | |
) # Create simplified summary for LLM consumption | |
summary = { | |
"status": ( | |
"active" | |
if config.get("is_active", False) and not is_finished | |
else "inactive" | |
), | |
"health": ( | |
"good" | |
if total_calls > 0 and (successful_calls / total_calls) > 0.8 | |
else "degraded" if total_calls > 0 else "no_data" | |
), | |
"calls_made": total_calls, | |
"success_rate": ( | |
round(successful_calls / total_calls * 100, 1) if total_calls > 0 else 0 | |
), | |
"last_call": monitored_data[0]["called_at"] if monitored_data else None, | |
"last_success": next( | |
(d["called_at"] for d in monitored_data if d.get("is_successful")), None | |
), | |
} | |
# Handle different return modes | |
if mode == "full": | |
# Return complete detailed data (original detailed format) | |
return { | |
"success": True, | |
"message": f"Full data retrieved for config_id {config_id}", | |
"config_name": config.get("name", "Unknown"), | |
"config_description": config.get("description", ""), | |
"is_active": config.get("is_active", False), | |
"is_finished": is_finished, | |
"progress": { | |
"total_calls": total_calls, | |
"successful_calls": successful_calls, | |
"failed_calls": failed_calls, | |
"expected_calls": total_expected_calls, | |
"success_rate": ( | |
round(successful_calls / total_calls * 100, 2) | |
if total_calls > 0 | |
else 0 | |
), | |
}, | |
"schedule_info": { | |
"interval_minutes": config.get("schedule_interval_minutes"), | |
"start_at": ( | |
config.get("start_at").isoformat() | |
if config.get("start_at") | |
else None | |
), | |
"stop_at": ( | |
config.get("stop_at").isoformat() | |
if config.get("stop_at") | |
else None | |
), | |
}, | |
"data": monitored_data, | |
} | |
elif mode == "details": | |
# Return full response data but minimal metadata (up to 10 recent calls) | |
recent_responses = [] | |
for item in monitored_data[:10]: # Last 10 calls with full responses | |
recent_responses.append( | |
{ | |
"timestamp": item["called_at"], | |
"success": item.get("is_successful", False), | |
"response_data": item.get( | |
"response_data" | |
), | |
"error": ( | |
item.get("error_message") | |
if not item.get("is_successful") | |
else None | |
), | |
} | |
) | |
return { | |
"success": True, | |
"config_name": config.get("name", "Unknown"), | |
"status": summary["status"], | |
"calls_made": total_calls, | |
"success_rate": summary["success_rate"], | |
"recent_responses": recent_responses, | |
} | |
else: # mode == "summary" (default) | |
# Get recent data (last 5 calls) with essential info only | |
recent_data = [] | |
for item in monitored_data[:5]: # Only last 5 calls | |
recent_data.append( | |
{ | |
"timestamp": item["called_at"], | |
"success": item.get("is_successful", False), | |
"error": ( | |
item.get("error_message") | |
if not item.get("is_successful") | |
else None | |
), | |
"response_preview": ( | |
str(item.get("response_data", ""))[:150] + "..." | |
if item.get("response_data") | |
else None | |
), | |
} | |
) | |
return { | |
"success": True, | |
"config_name": config.get("name", "Unknown"), | |
"summary": summary, | |
"recent_calls": recent_data, | |
"full_data_available": len(monitored_data), | |
"monitoring_details": { | |
"interval_minutes": config.get("schedule_interval_minutes"), | |
"is_finished": is_finished, | |
}, | |
} | |
except Exception as e: | |
return { | |
"success": False, | |
"message": f"Database connection failed: {str(e)}", | |
"data": [], | |
} | |