Googolplexic commited on
Commit
742b39f
·
1 Parent(s): f5f7336

added actuall api call into activate_monitoring; added api key validation

Browse files
Files changed (1) hide show
  1. api_monitor.py +204 -51
api_monitor.py CHANGED
@@ -6,10 +6,9 @@ import psycopg2
6
  import psycopg2.extras
7
  import os
8
  from dotenv import load_dotenv
9
- import time
10
- import threading
11
- import asyncio
12
  from apscheduler.schedulers.asyncio import AsyncIOScheduler
 
13
 
14
 
15
  # Load environment variables from .env file
@@ -36,6 +35,48 @@ def connect_to_db():
36
  )
37
 
38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
  def validate_api_configuration(
40
  mcp_api_key,
41
  name,
@@ -122,9 +163,7 @@ def validate_api_configuration(
122
  "sample_response": {...},
123
  "stop_at": "2025-06-11T12:00:00Z",
124
  "start_at": "2025-06-04T12:00:00Z"
125
- }
126
-
127
- NEXT STEP: If success=True, call activate_monitoring(config_id, mcp_api_key) to activate monitoring
128
  """
129
  try:
130
  # Validate input parameters
@@ -135,6 +174,15 @@ def validate_api_configuration(
135
  "config_id": None,
136
  }
137
 
 
 
 
 
 
 
 
 
 
138
  if not name or not name.strip():
139
  return {
140
  "success": False,
@@ -347,8 +395,8 @@ async def activate_monitoring(config_id, mcp_api_key):
347
  ERROR HANDLING: If config_id not found or invalid, returns success=False with error message
348
  """
349
 
350
- #need to extract
351
- '''
352
  mcp_api_key,
353
  name,
354
  description,
@@ -362,18 +410,27 @@ async def activate_monitoring(config_id, mcp_api_key):
362
  stop_after_hours,
363
  time_to_start,
364
  this
365
- '''
366
 
367
  # using time_to_start, schedule_interval_minutes, and stop_after_hours
368
- #label using name and description
369
-
370
-
371
 
372
- #attempt to create the scheduler
373
  try:
 
 
 
 
 
 
 
 
 
374
  conn = connect_to_db()
375
  cur = conn.cursor()
376
- cur.execute("SELECT * FROM api_configurations WHERE config_id = %s", (config_id,))
 
 
377
  config_row = cur.fetchone()
378
  if not config_row:
379
  conn.close()
@@ -389,12 +446,11 @@ async def activate_monitoring(config_id, mcp_api_key):
389
  "success": False,
390
  "message": "Invalid mcp_api_key. You are not authorized to activate this configuration.",
391
  "config_id": config_id,
392
- }
393
- # Extract scheduling parameters
394
  name = config.get("name", "Unknown")
395
  schedule_interval_minutes = float(config.get("schedule_interval_minutes", 20))
396
  stop_at = config.get("stop_at")
397
- start_at = config.get("time_to_start")
398
  if not start_at:
399
  start_at = datetime.now()
400
  else:
@@ -404,23 +460,90 @@ async def activate_monitoring(config_id, mcp_api_key):
404
  stop_at = start_at + timedelta(hours=config.get("stop_after_hours", 24))
405
  else:
406
  if not isinstance(stop_at, datetime):
407
- stop_at = datetime.fromisoformat(str(stop_at))
408
- # Dummy function to be scheduled
409
- def dummy_job():
 
 
410
  now = datetime.now()
411
  next_call = now + timedelta(minutes=schedule_interval_minutes)
412
- print(f"Executing job for {name} at {now.isoformat()}. Next call at {next_call.isoformat()}")
 
 
413
  try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
414
  job_conn = connect_to_db()
415
  job_cur = job_conn.cursor()
 
416
  # Mark config as active (only once, on first run)
417
  job_cur.execute(
418
  """
419
  UPDATE api_configurations SET is_active = %s WHERE config_id = %s
420
  """,
421
- (True, config_id)
422
  )
423
- # Insert a dummy result into api_call_results
 
424
  job_cur.execute(
425
  """
426
  INSERT INTO api_call_results (
@@ -429,42 +552,62 @@ async def activate_monitoring(config_id, mcp_api_key):
429
  """,
430
  (
431
  config_id,
432
- json.dumps({
433
- "success": True,
434
- "message": f"Scheduler activated for '{name}'",
435
- "config_id": config_id,
436
- "schedule_interval_minutes": schedule_interval_minutes,
437
- "stop_at": stop_at.isoformat(),
438
- "next_call_at": next_call.isoformat(),
439
- }),
440
- True,
441
- None,
442
  now,
443
- )
444
  )
445
  job_conn.commit()
446
  job_cur.close()
447
  job_conn.close()
 
 
 
 
 
 
 
448
  except Exception as job_exc:
449
- print(f"Dummy job DB error: {job_exc}")
450
- return {
451
- "success": True,
452
- "message": f"Scheduler activated for '{name}'",
453
- "config_id": config_id,
454
- "schedule_interval_minutes": schedule_interval_minutes,
455
- "stop_at": stop_at.isoformat(),
456
- "next_call_at": next_call.isoformat(),
457
- }
458
- # Setup AsyncIO scheduler
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
459
  scheduler = AsyncIOScheduler()
460
- # Schedule the dummy job
461
  scheduler.add_job(
462
- dummy_job,
463
- 'interval',
464
  minutes=schedule_interval_minutes,
465
  start_date=start_at,
466
  end_date=stop_at,
467
- id=f"monitor_{config_id}"
468
  )
469
  scheduler.start()
470
  conn.close()
@@ -474,7 +617,9 @@ async def activate_monitoring(config_id, mcp_api_key):
474
  "config_id": config_id,
475
  "schedule_interval_minutes": schedule_interval_minutes,
476
  "stop_at": stop_at.isoformat(),
477
- "next_call_at": (start_at + timedelta(minutes=schedule_interval_minutes)).isoformat(),
 
 
478
  }
479
  except Exception as e:
480
  return {
@@ -574,6 +719,15 @@ def retrieve_monitored_data(config_id, mcp_api_key, mode="summary"):
574
  ERROR HANDLING: If config_id not found or invalid, returns success=False with error message
575
  """
576
  try:
 
 
 
 
 
 
 
 
 
577
  conn = connect_to_db()
578
  cur = conn.cursor()
579
  cur.execute(
@@ -780,6 +934,7 @@ def retrieve_monitored_data(config_id, mcp_api_key, mode="summary"):
780
  "data": [],
781
  }
782
 
 
783
  ## testing
784
  if __name__ == "__main__":
785
  validation_response = validate_api_configuration(
@@ -813,5 +968,3 @@ if __name__ == "__main__":
813
  mcp_api_key="your_api_key",
814
  )
815
  print(json.dumps(response, indent=2, default=str))
816
-
817
-
 
6
  import psycopg2.extras
7
  import os
8
  from dotenv import load_dotenv
9
+
 
 
10
  from apscheduler.schedulers.asyncio import AsyncIOScheduler
11
+ import requests
12
 
13
 
14
  # Load environment variables from .env file
 
35
  )
36
 
37
 
38
+ def verify_mcp_api_key(api_key):
39
+ """
40
+ Verify the MCP API key with the key generation server.
41
+
42
+ Parameters:
43
+ - api_key: The MCP API key to verify
44
+
45
+ Returns:
46
+ - Dictionary with success status and message
47
+ """
48
+ try:
49
+ # Get the key server URL from environment or use default
50
+ key_server_url = os.getenv("KEY_SERVER_URL", "http://localhost:3001")
51
+
52
+ response = requests.post(
53
+ f"{key_server_url}/api/verifyKey",
54
+ json={"apiKey": api_key},
55
+ headers={"Content-Type": "application/json"},
56
+ timeout=10,
57
+ )
58
+
59
+ if response.status_code == 200:
60
+ data = response.json()
61
+ if data.get("valid"):
62
+ return {"success": True, "message": "API key is valid"}
63
+ else:
64
+ return {"success": False, "message": "API key is invalid"}
65
+ else:
66
+ return {
67
+ "success": False,
68
+ "message": f"Key verification failed with status {response.status_code}",
69
+ }
70
+
71
+ except requests.exceptions.RequestException as e:
72
+ return {
73
+ "success": False,
74
+ "message": f"Failed to connect to key verification service: {str(e)}",
75
+ }
76
+ except Exception as e:
77
+ return {"success": False, "message": f"Key verification error: {str(e)}"}
78
+
79
+
80
  def validate_api_configuration(
81
  mcp_api_key,
82
  name,
 
163
  "sample_response": {...},
164
  "stop_at": "2025-06-11T12:00:00Z",
165
  "start_at": "2025-06-04T12:00:00Z"
166
+ } NEXT STEP: If success=True, call activate_monitoring(config_id, mcp_api_key) to activate monitoring
 
 
167
  """
168
  try:
169
  # Validate input parameters
 
174
  "config_id": None,
175
  }
176
 
177
+ # Verify the MCP API key with the key generation server
178
+ key_verification = verify_mcp_api_key(mcp_api_key)
179
+ if not key_verification["success"]:
180
+ return {
181
+ "success": False,
182
+ "message": f"API key verification failed: {key_verification['message']}",
183
+ "config_id": None,
184
+ }
185
+
186
  if not name or not name.strip():
187
  return {
188
  "success": False,
 
395
  ERROR HANDLING: If config_id not found or invalid, returns success=False with error message
396
  """
397
 
398
+ # need to extract
399
+ """
400
  mcp_api_key,
401
  name,
402
  description,
 
410
  stop_after_hours,
411
  time_to_start,
412
  this
413
+ """
414
 
415
  # using time_to_start, schedule_interval_minutes, and stop_after_hours
416
+ # label using name and description
 
 
417
 
418
+ # attempt to create the scheduler
419
  try:
420
+ # Verify the MCP API key with the key generation server first
421
+ key_verification = verify_mcp_api_key(mcp_api_key)
422
+ if not key_verification["success"]:
423
+ return {
424
+ "success": False,
425
+ "message": f"API key verification failed: {key_verification['message']}",
426
+ "config_id": config_id,
427
+ }
428
+
429
  conn = connect_to_db()
430
  cur = conn.cursor()
431
+ cur.execute(
432
+ "SELECT * FROM api_configurations WHERE config_id = %s", (config_id,)
433
+ )
434
  config_row = cur.fetchone()
435
  if not config_row:
436
  conn.close()
 
446
  "success": False,
447
  "message": "Invalid mcp_api_key. You are not authorized to activate this configuration.",
448
  "config_id": config_id,
449
+ } # Extract scheduling parameters
 
450
  name = config.get("name", "Unknown")
451
  schedule_interval_minutes = float(config.get("schedule_interval_minutes", 20))
452
  stop_at = config.get("stop_at")
453
+ start_at = config.get("start_at")
454
  if not start_at:
455
  start_at = datetime.now()
456
  else:
 
460
  stop_at = start_at + timedelta(hours=config.get("stop_after_hours", 24))
461
  else:
462
  if not isinstance(stop_at, datetime):
463
+ stop_at = datetime.fromisoformat(
464
+ str(stop_at)
465
+ ) # Job function to make actual API calls
466
+
467
+ def api_monitoring_job():
468
  now = datetime.now()
469
  next_call = now + timedelta(minutes=schedule_interval_minutes)
470
+ print(
471
+ f"Executing API monitoring job for {name} at {now.isoformat()}. Next call at {next_call.isoformat()}"
472
+ )
473
  try:
474
+ # Extract API configuration parameters
475
+ method = config.get("method", "GET")
476
+ base_url = config.get("base_url")
477
+ endpoint = config.get("endpoint", "")
478
+ params = config.get("params", {})
479
+ headers = config.get("headers", {})
480
+ additional_params = config.get("additional_params", {})
481
+
482
+ # Convert JSON strings back to dicts if needed
483
+ if isinstance(params, str):
484
+ params = json.loads(params) if params else {}
485
+ if isinstance(headers, str):
486
+ headers = json.loads(headers) if headers else {}
487
+ if isinstance(additional_params, str):
488
+ additional_params = (
489
+ json.loads(additional_params) if additional_params else {}
490
+ )
491
+
492
+ # Convert params and headers back to key-value string format for api_client
493
+ param_keys_values = (
494
+ "\n".join([f"{k}: {v}" for k, v in params.items()])
495
+ if params
496
+ else ""
497
+ )
498
+ header_keys_values = (
499
+ "\n".join([f"{k}: {v}" for k, v in headers.items()])
500
+ if headers
501
+ else ""
502
+ )
503
+ additional_params_str = (
504
+ json.dumps(additional_params) if additional_params else "{}"
505
+ )
506
+
507
+ # Make the actual API call
508
+ api_result = api_client.call_api(
509
+ method=method,
510
+ base_url=base_url,
511
+ endpoint=endpoint,
512
+ param_keys_values=param_keys_values,
513
+ header_keys_values=header_keys_values,
514
+ additional_params=additional_params_str,
515
+ )
516
+
517
+ # Determine if the call was successful
518
+ is_successful = not (
519
+ isinstance(api_result, str) and api_result.startswith("Error")
520
+ )
521
+ error_message = api_result if not is_successful else None
522
+ response_data = api_result if is_successful else None
523
+
524
+ # Convert response to JSON if it's a string representation
525
+ if is_successful and isinstance(response_data, str):
526
+ try:
527
+ if response_data.startswith("{") or response_data.startswith(
528
+ "["
529
+ ):
530
+ response_data = json.loads(response_data)
531
+ except json.JSONDecodeError:
532
+ # Keep as string if not valid JSON
533
+ pass
534
+
535
  job_conn = connect_to_db()
536
  job_cur = job_conn.cursor()
537
+
538
  # Mark config as active (only once, on first run)
539
  job_cur.execute(
540
  """
541
  UPDATE api_configurations SET is_active = %s WHERE config_id = %s
542
  """,
543
+ (True, config_id),
544
  )
545
+
546
+ # Insert the actual API call result
547
  job_cur.execute(
548
  """
549
  INSERT INTO api_call_results (
 
552
  """,
553
  (
554
  config_id,
555
+ (
556
+ json.dumps(response_data)
557
+ if response_data is not None
558
+ else None
559
+ ),
560
+ is_successful,
561
+ error_message,
 
 
 
562
  now,
563
+ ),
564
  )
565
  job_conn.commit()
566
  job_cur.close()
567
  job_conn.close()
568
+
569
+ print(
570
+ f"API call result for {name}: {'Success' if is_successful else 'Failed'}"
571
+ )
572
+ if not is_successful:
573
+ print(f"Error: {error_message}")
574
+
575
  except Exception as job_exc:
576
+ print(f"API monitoring job error for {name}: {job_exc}")
577
+ try:
578
+ job_conn = connect_to_db()
579
+ job_cur = job_conn.cursor()
580
+ job_cur.execute(
581
+ """
582
+ INSERT INTO api_call_results (
583
+ config_id, response_data, is_successful, error_message, called_at
584
+ ) VALUES (%s, %s, %s, %s, %s)
585
+ """,
586
+ (
587
+ config_id,
588
+ None,
589
+ False,
590
+ f"Job execution error: {str(job_exc)}",
591
+ now,
592
+ ),
593
+ )
594
+ job_conn.commit()
595
+ job_cur.close()
596
+ job_conn.close()
597
+ except Exception as db_exc:
598
+ print(
599
+ f"Failed to log error to database: {db_exc}"
600
+ ) # Setup AsyncIO scheduler
601
+
602
  scheduler = AsyncIOScheduler()
603
+ # Schedule the API monitoring job
604
  scheduler.add_job(
605
+ api_monitoring_job,
606
+ "interval",
607
  minutes=schedule_interval_minutes,
608
  start_date=start_at,
609
  end_date=stop_at,
610
+ id=f"monitor_{config_id}",
611
  )
612
  scheduler.start()
613
  conn.close()
 
617
  "config_id": config_id,
618
  "schedule_interval_minutes": schedule_interval_minutes,
619
  "stop_at": stop_at.isoformat(),
620
+ "next_call_at": (
621
+ start_at + timedelta(minutes=schedule_interval_minutes)
622
+ ).isoformat(),
623
  }
624
  except Exception as e:
625
  return {
 
719
  ERROR HANDLING: If config_id not found or invalid, returns success=False with error message
720
  """
721
  try:
722
+ # Verify the MCP API key with the key generation server first
723
+ key_verification = verify_mcp_api_key(mcp_api_key)
724
+ if not key_verification["success"]:
725
+ return {
726
+ "success": False,
727
+ "message": f"API key verification failed: {key_verification['message']}",
728
+ "data": [],
729
+ }
730
+
731
  conn = connect_to_db()
732
  cur = conn.cursor()
733
  cur.execute(
 
934
  "data": [],
935
  }
936
 
937
+
938
  ## testing
939
  if __name__ == "__main__":
940
  validation_response = validate_api_configuration(
 
968
  mcp_api_key="your_api_key",
969
  )
970
  print(json.dumps(response, indent=2, default=str))