potatooine commited on
Commit
45742e0
·
1 Parent(s): 35489cf
Files changed (1) hide show
  1. api_monitor.py +68 -99
api_monitor.py CHANGED
@@ -11,6 +11,8 @@ from apscheduler.triggers.interval import IntervalTrigger
11
  from apscheduler.triggers.date import DateTrigger
12
  import time
13
  import threading
 
 
14
 
15
  # Load environment variables from .env file
16
  load_dotenv()
@@ -331,7 +333,7 @@ def validate_api_configuration(
331
  }
332
 
333
 
334
- def activate_monitoring(config_id, mcp_api_key):
335
  """
336
  TOOL: Activate periodic monitoring for a validated API configuration.
337
 
@@ -379,26 +381,6 @@ def activate_monitoring(config_id, mcp_api_key):
379
  ERROR HANDLING: If config_id not found or invalid, returns success=False with error message
380
  """
381
 
382
- """
383
- try:
384
- conn = connect_to_db()
385
- # TODO: Implement activation logic here
386
- #problem is that we probably want activation to be completely separate from everything
387
- conn.close()
388
-
389
- except Exception as e:
390
- return {
391
- "success": False,
392
- "message": f"Database connection failed: {str(e)}",
393
- "config_id": config_id,
394
- }
395
- """
396
-
397
- #get config from database
398
-
399
-
400
- #check if config_id is valid!
401
-
402
  #need to extract
403
  '''
404
  mcp_api_key,
@@ -419,35 +401,81 @@ def activate_monitoring(config_id, mcp_api_key):
419
  # using time_to_start, schedule_interval_minutes, and stop_after_hours
420
  #label using name and description
421
 
422
-
423
-
424
-
425
 
426
 
427
  #attempt to create the scheduler
428
  try:
429
- create_schedule_thread(
430
- tag = "defualt tag", # this is not a function
431
- start_delay_sec=1, # Use the time_to_start if provided
432
- interval_sec=3, # Convert schedule_interval_minutes to seconds
433
- duration_hours=0.015, # Use stop_after_hours if provided
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
434
  )
435
-
436
-
437
-
438
-
439
-
440
- except Exception as e:
441
  return {
442
- "success": False,
443
- "message": f"Failed to create scheduler: {str(e)}",
444
  "config_id": config_id,
 
 
 
445
  }
446
- # if we get down here something is horribly wrong, we should never get her
447
  except Exception as e:
448
  return {
449
  "success": False,
450
- "message": f"Database connection failed: {str(e)}",
451
  "config_id": config_id,
452
  }
453
 
@@ -748,65 +776,6 @@ def retrieve_monitored_data(config_id, mcp_api_key, mode="summary"):
748
  "data": [],
749
  }
750
 
751
- # 🔁 Create a new generator per schedule
752
- def make_data_generator(tag):
753
- i = 0
754
- while True:
755
- yield f"{tag}-{i}"
756
- i += 1
757
-
758
- # 🧱 Function that runs inside each thread
759
- def schedule_runner(tag, start_delay_sec, interval_sec, duration_hours):
760
- scheduler = BackgroundScheduler()
761
- start_time = datetime.now() + timedelta(seconds=start_delay_sec)
762
- end_time = start_time + timedelta(hours=duration_hours)
763
- generator = make_data_generator(tag)
764
-
765
- # The actual job to be scheduled
766
- def job_func():
767
- value = next(generator)
768
- print(f"[{tag}] 🔄 Job ran with: {value} at {datetime.now()}")
769
- yield {
770
- "success": True,
771
- "message": f"[{tag}] 🔄 Job ran with: {value} at {datetime.now()}"}
772
-
773
-
774
- # Graceful shutdown job
775
- def shutdown_func():
776
- print(f"[{tag}] ⛔ Scheduler shutting down at {datetime.now()}")
777
- scheduler.shutdown()
778
-
779
- # Add the jobs
780
- scheduler.add_job(
781
- job_func,
782
- trigger=IntervalTrigger(start_date=start_time, seconds=interval_sec, end_date=end_time)
783
- )
784
- scheduler.add_job(
785
- shutdown_func,
786
- trigger=DateTrigger(run_date=end_time)
787
- )
788
-
789
- scheduler.start()
790
- print(f"[{tag}] 🕒 Scheduler running from {start_time} to {end_time}, every {interval_sec}s.")
791
-
792
- # Keep thread alive while scheduler is running
793
- while scheduler.running:
794
- time.sleep(1)
795
-
796
- print(f"[{tag}] ✅ Scheduler completed.")
797
-
798
- # 🧵 Thread spawner
799
- def create_schedule_thread(tag, start_delay_sec, interval_sec, duration_hours):
800
- thread = threading.Thread(
801
- target=schedule_runner,
802
- args=(tag, start_delay_sec, interval_sec, duration_hours),
803
- daemon=True # Automatically closes with main program
804
- )
805
- thread.start()
806
- return thread
807
-
808
-
809
-
810
  ## testing
811
  if __name__ == "__main__":
812
  validation_response = validate_api_configuration(
@@ -841,4 +810,4 @@ if __name__ == "__main__":
841
  )
842
  print(json.dumps(response, indent=2, default=str))
843
 
844
-
 
11
  from apscheduler.triggers.date import DateTrigger
12
  import time
13
  import threading
14
+ import asyncio
15
+ from apscheduler.schedulers.asyncio import AsyncIOScheduler
16
 
17
  # Load environment variables from .env file
18
  load_dotenv()
 
333
  }
334
 
335
 
336
+ async def activate_monitoring(config_id, mcp_api_key):
337
  """
338
  TOOL: Activate periodic monitoring for a validated API configuration.
339
 
 
381
  ERROR HANDLING: If config_id not found or invalid, returns success=False with error message
382
  """
383
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
384
  #need to extract
385
  '''
386
  mcp_api_key,
 
401
  # using time_to_start, schedule_interval_minutes, and stop_after_hours
402
  #label using name and description
403
 
 
 
 
404
 
405
 
406
  #attempt to create the scheduler
407
  try:
408
+ conn = connect_to_db()
409
+ cur = conn.cursor()
410
+ cur.execute("SELECT * FROM api_configurations WHERE config_id = %s", (config_id,))
411
+ config_row = cur.fetchone()
412
+ if not config_row:
413
+ conn.close()
414
+ return {
415
+ "success": False,
416
+ "message": "Invalid config_id",
417
+ "config_id": config_id,
418
+ }
419
+ config = dict(config_row)
420
+ if config["mcp_api_key"] != mcp_api_key:
421
+ conn.close()
422
+ return {
423
+ "success": False,
424
+ "message": "Invalid mcp_api_key. You are not authorized to activate this configuration.",
425
+ "config_id": config_id,
426
+ }
427
+ # Extract scheduling parameters
428
+ name = config.get("name", "Unknown")
429
+ schedule_interval_minutes = config.get("schedule_interval_minutes", 20)
430
+ stop_at = config.get("stop_at")
431
+ start_at = config.get("time_to_start")
432
+ if not start_at:
433
+ start_at = datetime.now()
434
+ else:
435
+ if not isinstance(start_at, datetime):
436
+ start_at = datetime.fromisoformat(str(start_at))
437
+ if not stop_at:
438
+ stop_at = start_at + timedelta(hours=config.get("stop_after_hours", 24))
439
+ else:
440
+ if not isinstance(stop_at, datetime):
441
+ stop_at = datetime.fromisoformat(str(stop_at))
442
+ # Dummy function to be scheduled
443
+ def dummy_job():
444
+ now = datetime.now()
445
+ next_call = now + timedelta(minutes=schedule_interval_minutes)
446
+ return {
447
+ "success": True,
448
+ "message": f"Scheduler activated for '{name}'",
449
+ "config_id": config_id,
450
+ "schedule_interval_minutes": schedule_interval_minutes,
451
+ "stop_at": stop_at.isoformat(),
452
+ "next_call_at": next_call.isoformat(),
453
+ }
454
+ # Setup AsyncIO scheduler
455
+ scheduler = AsyncIOScheduler()
456
+ # Schedule the dummy job
457
+ scheduler.add_job(
458
+ dummy_job,
459
+ 'interval',
460
+ minutes=schedule_interval_minutes,
461
+ start_date=start_at,
462
+ end_date=stop_at,
463
+ id=f"monitor_{config_id}"
464
  )
465
+ scheduler.start()
466
+ conn.close()
 
 
 
 
467
  return {
468
+ "success": True,
469
+ "message": f"Scheduler activated for '{name}'",
470
  "config_id": config_id,
471
+ "schedule_interval_minutes": schedule_interval_minutes,
472
+ "stop_at": stop_at.isoformat(),
473
+ "next_call_at": (start_at + timedelta(minutes=schedule_interval_minutes)).isoformat(),
474
  }
 
475
  except Exception as e:
476
  return {
477
  "success": False,
478
+ "message": f"Failed to create scheduler: {str(e)}",
479
  "config_id": config_id,
480
  }
481
 
 
776
  "data": [],
777
  }
778
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
779
  ## testing
780
  if __name__ == "__main__":
781
  validation_response = validate_api_configuration(
 
810
  )
811
  print(json.dumps(response, indent=2, default=str))
812
 
813
+