|
|
|
|
|
|
|
import os |
|
import dspy |
|
import logging |
|
import time |
|
from datetime import datetime, timedelta |
|
from dotenv import load_dotenv |
|
from dspy_llm_wrapper import SyncCustomGeminiDspyLM |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='{levelname} {asctime} {name}: {message}', style='{') |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
class RateLimiter: |
|
"""Manages call frequency to an API to stay within defined limits.""" |
|
def __init__(self, max_calls: int = 7, time_period: int = 60): |
|
""" |
|
Initializes the RateLimiter. |
|
Args: |
|
max_calls (int): The maximum number of calls allowed in the time period. |
|
time_period (int): The time period in seconds. |
|
""" |
|
if not isinstance(max_calls, int) or max_calls <= 0: |
|
raise ValueError("max_calls must be a positive integer.") |
|
if not isinstance(time_period, int) or time_period <= 0: |
|
raise ValueError("time_period must be a positive integer.") |
|
|
|
self.max_calls = max_calls |
|
self.time_period = time_period |
|
self.call_timestamps = [] |
|
self.total_calls_made = 0 |
|
logger.info(f"Rate Limiter Initialized: Max {self.max_calls} calls per {self.time_period} seconds.") |
|
|
|
def wait_if_needed(self): |
|
""" |
|
Blocks execution if the call rate would be exceeded. Waits until the next call is allowed. |
|
""" |
|
|
|
current_time = datetime.now() |
|
self.call_timestamps = [ts for ts in self.call_timestamps if current_time - ts < timedelta(seconds=self.time_period)] |
|
|
|
|
|
if len(self.call_timestamps) >= self.max_calls: |
|
oldest_call_in_window = self.call_timestamps[0] |
|
time_to_wait = (oldest_call_in_window + timedelta(seconds=self.time_period)) - current_time |
|
wait_seconds = time_to_wait.total_seconds() |
|
|
|
if wait_seconds > 0: |
|
logger.warning(f"[Rate Limiter]: Limit of {self.max_calls} calls reached. Waiting for {wait_seconds:.2f} seconds.") |
|
time.sleep(wait_seconds) |
|
|
|
|
|
self.call_timestamps.append(datetime.now()) |
|
self.total_calls_made += 1 |
|
logger.info(f"[Rate Limiter]: Call #{self.total_calls_made} permitted.") |
|
|
|
|
|
API_KEY = os.getenv("GOOGLE_API_KEY") |
|
|
|
def initialize_dspy( |
|
model_name: str = "gemini/gemini-2.5-flash-preview-04-17", |
|
api_key: str = None, |
|
**kwargs |
|
) -> "SyncCustomGeminiDspyLM | None": |
|
|
|
logging.info(f"Attempting to initialize DSPy with model: {model_name}") |
|
|
|
|
|
resolved_api_key = api_key or os.getenv("GOOGLE_API_KEY") |
|
if not resolved_api_key: |
|
logging.error( |
|
"FATAL: GOOGLE_API_KEY not provided as an argument or found in environment variables." |
|
) |
|
return None |
|
|
|
|
|
lm_kwargs = {"temperature": 0.7, **kwargs} |
|
try: |
|
custom_lm = SyncCustomGeminiDspyLM( |
|
model=model_name, |
|
api_key=resolved_api_key, |
|
**lm_kwargs |
|
) |
|
except Exception as e: |
|
logging.error(f"Failed to instantiate SyncCustomGeminiDspyLM: {e}", exc_info=True) |
|
return None |
|
|
|
|
|
try: |
|
logging.info("Verifying credentials with a test API call...") |
|
|
|
test_response = custom_lm(prompt="Hello, this is a test. Respond with 'OK'.") |
|
|
|
|
|
if not test_response or "[ERROR:" in test_response[0]: |
|
raise ValueError(f"Test call failed. Response: {test_response}") |
|
|
|
|
|
logging.info("LM verification successful.") |
|
except Exception as e: |
|
logging.error( |
|
f"FATAL: LM verification failed for model '{model_name}'. " |
|
f"Please check your API key, model name, and network connection. Error: {e}", |
|
exc_info=True |
|
) |
|
return None |
|
|
|
|
|
dspy.settings.configure(lm=custom_lm) |
|
logging.info( |
|
f"DSPy successfully configured globally with validated model: {model_name}" |
|
) |
|
|
|
return test_response |
|
|
|
|
|
STATE_STAGE = "stage" |
|
STATE_HISTORY = "conversation_history" |
|
STATE_FINAL_SYLLABUS = "final_syllabus_xml" |
|
STATE_EXPLAINER_PROMPT = "explainer_system_prompt" |
|
STATE_EXPLANATION_START_INDEX = "explanation_start_index" |
|
STATE_CURRENT_TITLE = "current_title" |
|
STATE_GENERATED_TITLE = "generated_title" |
|
STATE_RESOURCE_SUMMARY_OVERVIEW = "resource_summary_overview_for_manager" |
|
STATE_RESOURCE_TYPE_FOR_SYLLABUS = "resource_type_for_syllabus_gen" |
|
STATE_RESOURCE_CONTENT_JSON_FOR_SYLLABUS = "resource_content_json_for_syllabus_gen" |
|
STATE_UPLOADED_FILENAMES = "uploaded_filenames_list" |
|
STATE_IS_FIRST_TURN = "is_first_turn" |
|
|
|
|
|
STATE_DISPLAY_SYLLABUS_FLAG = "display_syllabus_flag" |
|
STATE_TRANSITION_EXPLAINER_FLAG = "transition_to_explainer_flag" |
|
|
|
|
|
STAGE_START = "START" |
|
STAGE_NEGOTIATING = "NEGOTIATING" |
|
STAGE_EXPLAINING = "EXPLAINING" |
|
STAGE_ERROR = "ERROR" |
|
|
|
|
|
DEFAULT_CHAT_TITLE = "New Chat" |
|
TITLE_GENERATION_THRESHOLD = 4 |
|
TITLE_MAX_HISTORY_SNIPPET_FOR_TITLE = 6 |