Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
import requests | |
import pandas as pd | |
# Ensure basic_agent.py is in the same directory | |
from basic_agent import BasicAgent | |
import json | |
import tempfile | |
# --- Constants --- | |
DEFAULT_API_URL = os.getenv( | |
"API_URL", "https://agents-course-unit4-scoring.hf.space") | |
QUESTIONS_URL = f"{DEFAULT_API_URL}/questions" | |
SUBMIT_URL = f"{DEFAULT_API_URL}/submit" | |
PLACEHOLDER_UNATTEMPTED = "_NOT_ATTEMPTED_" | |
# --- Agent Instantiation Helper --- | |
def get_agent_instance(): | |
try: | |
return BasicAgent() | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
gr.Warning(f"Error initializing agent: {e}") | |
return None | |
# --- Original run_and_submit_all function --- | |
def run_and_submit_all(profile: gr.OAuthProfile | None): | |
space_id = os.getenv("SPACE_ID") | |
if not profile: | |
gr.Warning("Please Login first.") | |
return "Login required.", pd.DataFrame() | |
username = profile.username | |
print(f"User logged in: {username}") | |
agent = get_agent_instance() | |
if not agent: | |
return "Failed to initialize agent.", pd.DataFrame() | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "local_run" | |
print(f"Fetching questions from: {QUESTIONS_URL}") | |
try: | |
response = requests.get(QUESTIONS_URL, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
return "Fetched questions list is empty.", pd.DataFrame() | |
print(f"Fetched {len(questions_data)} questions.") | |
except Exception as e: | |
return f"Error fetching/decoding questions: {e}", pd.DataFrame() | |
results_log = [] | |
answers_payload = [] | |
print(f"Running agent on all {len(questions_data)} questions...") | |
for item in questions_data: | |
task_id, q_text = item.get("task_id"), item.get("question") | |
if not task_id or q_text is None: | |
print(f"Skipping item: {item}") | |
continue | |
try: | |
print(f"Running agent for Task ID {task_id}...") | |
submitted_answer = agent(task_id, q_text) | |
answers_payload.append( | |
{"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append( | |
{"Task ID": task_id, "Question": q_text, "Submitted Answer": submitted_answer}) | |
except Exception as e: | |
results_log.append( | |
{"Task ID": task_id, "Question": q_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
results_df = pd.DataFrame(results_log, columns=[ | |
"Task ID", "Question", "Submitted Answer"]) # Ensure column order | |
if not answers_payload: | |
return "Agent produced no answers.", results_df | |
submission_data = {"username": username.strip( | |
), "agent_code": agent_code, "answers": answers_payload} | |
print(f"Submitting {len(answers_payload)} answers to: {SUBMIT_URL}") | |
print("Submitting data:", json.dumps(submission_data, indent=2)) | |
try: | |
response = requests.post( | |
SUBMIT_URL, json=submission_data, timeout=max(60, len(answers_payload) * 2)) | |
response.raise_for_status() | |
result_data = response.json() | |
return (f"Submission Successful! User: {result_data.get('username')}, " | |
f"Score: {result_data.get('score', 'N/A')}% ({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')}), " | |
f"Msg: {result_data.get('message', '')}"), results_df | |
except Exception as e: | |
return f"Submission Failed: {e}", results_df | |
# --- Step-by-Step Action Functions --- | |
def load_questions_action(profile: gr.OAuthProfile | None): | |
if not profile: | |
gr.Warning("Please Login first.") | |
return "Login required.", [], pd.DataFrame(), None | |
print(f"Fetching questions for {profile.username} from: {QUESTIONS_URL}") | |
try: | |
response = requests.get(QUESTIONS_URL, timeout=15) | |
response.raise_for_status() | |
questions_server_data = response.json() | |
if not questions_server_data: | |
return "Fetched questions list is empty.", [], pd.DataFrame(), None | |
new_results_log = [ | |
{"Task ID": q.get("task_id"), "Question": q.get( | |
"question"), "Submitted Answer": PLACEHOLDER_UNATTEMPTED} | |
for q in questions_server_data if q.get("task_id") and q.get("question") is not None | |
] | |
msg = f"Fetched {len(new_results_log)} questions. Progress reset." | |
gr.Info(msg) | |
return ( | |
msg, | |
# For results_log_list_state (this is the single source of truth now) | |
new_results_log, | |
pd.DataFrame(new_results_log, columns=[ | |
"Task ID", "Question", "Submitted Answer"]), # For results_display_table | |
None # For q_number_input (reset selection) | |
) | |
except Exception as e: | |
msg = f"Error fetching questions: {e}" | |
gr.Error(msg) | |
return msg, [], pd.DataFrame(), None | |
def run_single_question_action(profile: gr.OAuthProfile | None, q_idx: int | None, current_results_log: list): | |
if not profile: | |
gr.Warning("Please Login first.") | |
return "Login required.", current_results_log, pd.DataFrame(current_results_log) | |
# current_results_log is results_log_list_state, which has 'Task ID', 'Question', 'Submitted Answer' | |
if not current_results_log: | |
gr.Warning("No questions loaded.") | |
return "No questions loaded.", current_results_log, pd.DataFrame(current_results_log) | |
if q_idx is None: | |
gr.Warning("Select question or enter index.") | |
return "Invalid index.", current_results_log, pd.DataFrame(current_results_log) | |
if not 0 <= q_idx < len(current_results_log): | |
return f"Index {q_idx} out of bounds.", current_results_log, pd.DataFrame(current_results_log) | |
agent = get_agent_instance() | |
if not agent: | |
return "Agent init failed.", current_results_log, pd.DataFrame(current_results_log) | |
# Get question details from the selected row in current_results_log | |
item_to_process = current_results_log[q_idx] | |
task_id, q_text = item_to_process.get( | |
"Task ID"), item_to_process.get("Question") | |
if not task_id or q_text is None: | |
return f"Invalid question data at index {q_idx}.", current_results_log, pd.DataFrame(current_results_log) | |
print(f"Running for Task ID {task_id} (Index {q_idx}): {q_text[:50]}...") | |
try: | |
submitted_answer = agent(task_id, q_text) | |
status_msg = f"Successfully processed Task ID {task_id}." | |
except Exception as e: | |
submitted_answer = f"AGENT ERROR: {e}" | |
status_msg = f"Error on task {task_id}: {e}" | |
gr.Error(status_msg) | |
updated_results_log = list(current_results_log) # Make a mutable copy | |
updated_results_log[q_idx] = { | |
"Task ID": task_id, "Question": q_text, "Submitted Answer": submitted_answer} | |
gr.Info(status_msg if "AGENT ERROR" not in submitted_answer else "Agent run finished with error.") | |
return status_msg, updated_results_log, pd.DataFrame(updated_results_log, columns=["Task ID", "Question", "Submitted Answer"]) | |
def download_progress_action(results_log_list: list): | |
if not results_log_list: | |
gr.Info("No progress to download.") | |
return None | |
try: | |
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json", encoding='utf-8') as tmpfile: | |
json.dump(results_log_list, tmpfile, indent=2) | |
gr.Info("Progress file ready.") | |
return gr.File(value=tmpfile.name, label="progress.json") | |
except Exception as e: | |
gr.Error(f"Error preparing download: {e}") | |
return None | |
def load_progress_action(uploaded_file_obj): | |
if uploaded_file_obj is None: | |
gr.Warning("No file uploaded.") | |
return "No file.", [], pd.DataFrame(), None | |
try: | |
with open(uploaded_file_obj.name, "r", encoding='utf-8') as f: | |
loaded_data = json.load(f) | |
if not isinstance(loaded_data, list) or \ | |
not all(isinstance(item, dict) and all(k in item for k in ["Task ID", "Question", "Submitted Answer"]) for item in loaded_data): | |
raise ValueError( | |
"Invalid file format. Expects list of {'Task ID': ..., 'Question': ..., 'Submitted Answer': ...}") | |
new_results_log_list = loaded_data | |
msg = f"Loaded {len(new_results_log_list)} entries from file." | |
gr.Info(msg) | |
return ( | |
msg, | |
new_results_log_list, | |
pd.DataFrame(new_results_log_list, columns=[ | |
"Task ID", "Question", "Submitted Answer"]), | |
None # Reset selected index | |
) | |
except Exception as e: | |
msg = f"Error loading progress: {e}" | |
gr.Error(msg) | |
return msg, [], pd.DataFrame(), None | |
def submit_current_results_action(profile: gr.OAuthProfile | None, results_log_list: list): | |
if not profile: | |
gr.Warning("Please Login first.") | |
return "Login required." | |
username = profile.username | |
if not results_log_list: | |
return "No results to submit." | |
space_id = os.getenv("SPACE_ID") | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "local_run" | |
answers_payload = [ | |
{"task_id": e["Task ID"], "submitted_answer": e["Submitted Answer"]} | |
for e in results_log_list | |
if e["Submitted Answer"] != PLACEHOLDER_UNATTEMPTED and "AGENT ERROR" not in str(e.get("Submitted Answer", "")) | |
] | |
if not answers_payload: | |
return "No attempted (non-error) answers to submit." | |
submission_data = {"username": username.strip( | |
), "agent_code": agent_code, "answers": answers_payload} | |
gr.Info(f"Submitting {len(answers_payload)} answers for '{username}'...") | |
print("Submitting data:", json.dumps(submission_data, indent=2)) | |
try: | |
response = requests.post( | |
SUBMIT_URL, json=submission_data, timeout=max(60, len(answers_payload)*2)) | |
response.raise_for_status() | |
result_data = response.json() | |
return (f"Submission Successful! User: {result_data.get('username')}, Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')}), Msg: {result_data.get('message', '')}") | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() # This is key | |
error_detail += f" Detail: {error_json.get('detail', e.response.text if e.response else 'No response text')}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500] if e.response else 'No response text'}" | |
status_message = f"Submission Failed: {error_detail}" | |
gr.Error(status_message) | |
return status_message | |
# --- Build Gradio Interface --- | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# Enhanced Agent Evaluation Runner") | |
# ... Instructions markdown ... | |
# Single source of truth for the state of all questions and their answers | |
results_log_list_state = gr.State([]) | |
gr.LoginButton() | |
with gr.Tabs(): | |
with gr.TabItem("Step-by-Step Evaluation"): | |
gr.Markdown("## Evaluation Workflow") | |
with gr.Row(): | |
load_questions_button = gr.Button( | |
"1. Load Questions from Server", variant="secondary") | |
load_q_status = gr.Textbox( | |
label="Load Status", interactive=False, lines=1) | |
gr.Markdown("### 2. Select a Question and Run Agent") | |
# This table is now the main display for questions and answers | |
results_display_table = gr.DataFrame( | |
label="Questions & Answers (Select row to run agent)", | |
headers=["Task ID", "Question", "Submitted Answer"], | |
row_count=10, | |
wrap=True, | |
interactive=True # Allows row selection | |
) | |
with gr.Row(): | |
q_number_input = gr.Number( | |
label="Selected Question Index", minimum=0, precision=0, step=1, value=None, interactive=True) | |
run_single_q_button = gr.Button( | |
"Run Agent for Selected Index", variant="primary") | |
single_q_status = gr.Textbox( | |
label="Run Single Status", interactive=False, lines=1) | |
with gr.Accordion("3. Manage Full Progress (Download/Upload)", open=False): | |
download_file_output = gr.File( | |
label="Download Link", interactive=False) | |
download_button = gr.Button("Download All Progress") | |
with gr.Row(): | |
upload_file_input = gr.File( | |
label="Upload Progress File (JSON)", type="filepath", file_types=[".json"]) | |
load_progress_button = gr.Button("Load Uploaded File") | |
upload_status = gr.Textbox( | |
label="Upload Status", interactive=False, lines=1) | |
gr.Markdown("### 4. Submit Results") | |
submit_step_by_step_button = gr.Button( | |
"Submit Attempted Answers", variant="primary") | |
submit_sbs_status = gr.Textbox( | |
label="Submission Status", lines=3, interactive=False) | |
with gr.TabItem("Run All & Submit (Original Batch)"): | |
gr.Markdown("## Original Batch Runner") | |
original_run_button = gr.Button( | |
"Run All Questions & Submit", variant="primary") | |
original_status_output = gr.Textbox( | |
label="Batch Run Status / Result", lines=3, interactive=False) | |
original_results_table = gr.DataFrame(label="Batch Run Q&A", wrap=True, interactive=False, headers=[ | |
"Task ID", "Question", "Submitted Answer"]) | |
# --- Wire up Step-by-Step controls --- | |
load_questions_button.click( | |
fn=load_questions_action, inputs=[], | |
outputs=[load_q_status, results_log_list_state, | |
results_display_table, q_number_input] | |
) | |
def handle_select_question_from_results_table(evt: gr.SelectData): | |
if evt.index is not None: | |
# evt.index should be the row index (int) for single row selection | |
# If it's a tuple (row, col) for cell selection, take index[0] | |
if isinstance(evt.index, tuple): | |
return evt.index[0] | |
elif isinstance(evt.index, int): | |
return evt.index | |
# Handle list for multi-select if it were enabled (take first) | |
elif isinstance(evt.index, list) and evt.index: | |
return evt.index[0] | |
return None # No change or clear if no valid selection | |
results_display_table.select( | |
fn=handle_select_question_from_results_table, inputs=None, outputs=[q_number_input], show_progress="hidden" | |
) | |
run_single_q_button.click( | |
fn=run_single_question_action, | |
inputs=[q_number_input, results_log_list_state], | |
outputs=[single_q_status, results_log_list_state, results_display_table] | |
) | |
download_button.click(download_progress_action, [ | |
results_log_list_state], [download_file_output]) | |
load_progress_button.click( | |
load_progress_action, [upload_file_input], | |
[upload_status, results_log_list_state, | |
results_display_table, q_number_input] | |
) | |
submit_step_by_step_button.click( | |
submit_current_results_action, [ | |
results_log_list_state], [submit_sbs_status] | |
) | |
original_run_button.click(run_and_submit_all, [], [ | |
original_status_output, original_results_table]) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " App Starting " + "-"*30) | |
space_host_startup = os.getenv("SPACE_HOST") | |
space_id_startup = os.getenv("SPACE_ID") | |
if space_host_startup: | |
print( | |
f"✅ SPACE_HOST: {space_host_startup}, URL: https://{space_host_startup}.hf.space") | |
else: | |
print("ℹ️ SPACE_HOST not found (local run?).") | |
if space_id_startup: | |
print( | |
f"✅ SPACE_ID: {space_id_startup}, Repo: https://huggingface.co/spaces/{space_id_startup}") | |
else: | |
print("ℹ️ SPACE_ID not found. Repo URL cannot be determined.") | |
print("-"*(60 + len(" App Starting ")) + "\n") | |
demo.launch(debug=True) | |