Spaces:
Running
Running
import os | |
import json | |
import numpy as np | |
from datasets import load_dataset | |
from transformers import AutoTokenizer, AutoModelForQuestionAnswering, pipeline | |
import torch | |
from sklearn.metrics import f1_score | |
import re | |
from collections import Counter | |
import string | |
from huggingface_hub import login | |
import gradio as gr | |
import pandas as pd | |
from datetime import datetime | |
def normalize_answer(s): | |
"""Normalize answer for evaluation""" | |
def remove_articles(text): | |
return re.sub(r'\b(a|an|the)\b', ' ', text) | |
def white_space_fix(text): | |
return ' '.join(text.split()) | |
def remove_punc(text): | |
exclude = set(string.punctuation) | |
return ''.join(ch for ch in text if ch not in exclude) | |
def lower(text): | |
return text.lower() | |
return white_space_fix(remove_articles(remove_punc(lower(s)))) | |
def f1_score_qa(prediction, ground_truth): | |
"""Calculate F1 score for QA""" | |
prediction_tokens = normalize_answer(prediction).split() | |
ground_truth_tokens = normalize_answer(ground_truth).split() | |
if len(prediction_tokens) == 0 or len(ground_truth_tokens) == 0: | |
return int(prediction_tokens == ground_truth_tokens) | |
common = Counter(prediction_tokens) & Counter(ground_truth_tokens) | |
num_same = sum(common.values()) | |
if num_same == 0: | |
return 0 | |
precision = 1.0 * num_same / len(prediction_tokens) | |
recall = 1.0 * num_same / len(ground_truth_tokens) | |
f1 = (2 * precision * recall) / (precision + recall) | |
return f1 | |
def exact_match_score(prediction, ground_truth): | |
"""Calculate exact match score""" | |
return normalize_answer(prediction) == normalize_answer(ground_truth) | |
def evaluate_model(): | |
# Authenticate with Hugging Face using the token | |
hf_token = os.getenv("EVAL_TOKEN") | |
if hf_token: | |
try: | |
login(token=hf_token) | |
print("β Authenticated with Hugging Face") | |
except Exception as e: | |
print(f"β Warning: Could not authenticate with HF token: {e}") | |
else: | |
print("β Warning: EVAL_TOKEN not found in environment variables") | |
print("Loading model and tokenizer...") | |
model_name = "AvocadoMuffin/roberta-cuad-qa-v2" | |
try: | |
tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token) | |
model = AutoModelForQuestionAnswering.from_pretrained(model_name, token=hf_token) | |
qa_pipeline = pipeline("question-answering", model=model, tokenizer=tokenizer) | |
print("β Model loaded successfully") | |
return qa_pipeline, hf_token | |
except Exception as e: | |
print(f"β Error loading model: {e}") | |
return None, None | |
def run_evaluation(num_samples, progress=gr.Progress()): | |
"""Run evaluation and return results for Gradio interface""" | |
# Load model | |
qa_pipeline, hf_token = evaluate_model() | |
if qa_pipeline is None: | |
return "β Failed to load model", pd.DataFrame(), None | |
progress(0.1, desc="Loading CUAD dataset...") | |
# Load dataset - use QA format version (JSON, no PDFs) | |
try: | |
# Try the QA-specific version first (much faster, JSON format) | |
dataset = load_dataset("theatticusproject/cuad-qa", trust_remote_code=True, token=hf_token) | |
test_data = dataset["test"] | |
print(f"β Loaded CUAD-QA dataset with {len(test_data)} samples") | |
except Exception as e: | |
try: | |
# Fallback to original but limit to avoid PDF downloads | |
dataset = load_dataset("cuad", split="test[:1000]", trust_remote_code=True, token=hf_token) | |
test_data = dataset | |
print(f"β Loaded CUAD dataset with {len(test_data)} samples") | |
except Exception as e2: | |
return f"β Error loading dataset: {e2}", pd.DataFrame(), None | |
# Limit samples | |
num_samples = min(num_samples, len(test_data)) | |
test_subset = test_data.select(range(num_samples)) | |
progress(0.2, desc=f"Starting evaluation on {num_samples} samples...") | |
# Initialize metrics | |
exact_matches = [] | |
f1_scores = [] | |
predictions = [] | |
# Run evaluation | |
for i, example in enumerate(test_subset): | |
progress((0.2 + 0.7 * i / num_samples), desc=f"Processing sample {i+1}/{num_samples}") | |
try: | |
context = example["context"] | |
question = example["question"] | |
answers = example["answers"] | |
# Get model prediction | |
result = qa_pipeline(question=question, context=context) | |
predicted_answer = result["answer"] | |
# Get ground truth answers | |
if answers["text"] and len(answers["text"]) > 0: | |
ground_truth = answers["text"][0] if isinstance(answers["text"], list) else answers["text"] | |
else: | |
ground_truth = "" | |
# Calculate metrics | |
em = exact_match_score(predicted_answer, ground_truth) | |
f1 = f1_score_qa(predicted_answer, ground_truth) | |
exact_matches.append(em) | |
f1_scores.append(f1) | |
predictions.append({ | |
"Sample_ID": i+1, | |
"Question": question[:100] + "..." if len(question) > 100 else question, | |
"Predicted_Answer": predicted_answer, | |
"Ground_Truth": ground_truth, | |
"Exact_Match": em, | |
"F1_Score": round(f1, 3), | |
"Confidence": round(result["score"], 3) | |
}) | |
except Exception as e: | |
print(f"Error processing sample {i}: {e}") | |
continue | |
progress(0.9, desc="Calculating final metrics...") | |
# Calculate final metrics | |
if len(exact_matches) == 0: | |
return "β No samples were successfully processed", pd.DataFrame(), None | |
avg_exact_match = np.mean(exact_matches) * 100 | |
avg_f1_score = np.mean(f1_scores) * 100 | |
# Create results summary | |
results_summary = f""" | |
# π CUAD Model Evaluation Results | |
## π― Overall Performance | |
- **Model**: AvocadoMuffin/roberta-cuad-qa-v3 | |
- **Dataset**: CUAD (Contract Understanding Atticus Dataset) | |
- **Samples Evaluated**: {len(exact_matches)} | |
- **Evaluation Date**: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | |
## π Metrics | |
- **Exact Match Score**: {avg_exact_match:.2f}% | |
- **F1 Score**: {avg_f1_score:.2f}% | |
## π Performance Analysis | |
- **High Confidence Predictions**: {len([p for p in predictions if p['Confidence'] > 0.8])} ({len([p for p in predictions if p['Confidence'] > 0.8])/len(predictions)*100:.1f}%) | |
- **Perfect Matches**: {len([p for p in predictions if p['Exact_Match'] == 1])} ({len([p for p in predictions if p['Exact_Match'] == 1])/len(predictions)*100:.1f}%) | |
- **High F1 Scores (>0.8)**: {len([p for p in predictions if p['F1_Score'] > 0.8])} ({len([p for p in predictions if p['F1_Score'] > 0.8])/len(predictions)*100:.1f}%) | |
""" | |
# Create detailed results DataFrame | |
df = pd.DataFrame(predictions) | |
# Save results to file | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
results_file = f"cuad_evaluation_results_{timestamp}.json" | |
detailed_results = { | |
"model_name": "AvocadoMuffin/roberta-cuad-qa-v3", | |
"dataset": "cuad", | |
"num_samples": len(exact_matches), | |
"exact_match_score": avg_exact_match, | |
"f1_score": avg_f1_score, | |
"evaluation_date": datetime.now().isoformat(), | |
"predictions": predictions | |
} | |
try: | |
with open(results_file, "w") as f: | |
json.dump(detailed_results, f, indent=2) | |
print(f"β Results saved to {results_file}") | |
except Exception as e: | |
print(f"β Warning: Could not save results file: {e}") | |
results_file = None | |
progress(1.0, desc="β Evaluation completed!") | |
return results_summary, df, results_file | |
def create_gradio_interface(): | |
"""Create Gradio interface for CUAD evaluation""" | |
with gr.Blocks(title="CUAD Model Evaluator", theme=gr.themes.Soft()) as demo: | |
gr.HTML(""" | |
<div style="text-align: center; padding: 20px;"> | |
<h1>ποΈ CUAD Model Evaluation Dashboard</h1> | |
<p>Evaluate your CUAD (Contract Understanding Atticus Dataset) Question Answering model</p> | |
<p><strong>Model:</strong> AvocadoMuffin/roberta-cuad-qa-v2</p> | |
</div> | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.HTML("<h3>βοΈ Evaluation Settings</h3>") | |
num_samples = gr.Slider( | |
minimum=10, | |
maximum=500, | |
value=100, | |
step=10, | |
label="Number of samples to evaluate", | |
info="Choose between 10-500 samples (more samples = more accurate but slower)" | |
) | |
evaluate_btn = gr.Button( | |
"π Start Evaluation", | |
variant="primary", | |
size="lg" | |
) | |
gr.HTML(""" | |
<div style="margin-top: 20px; padding: 15px; background-color: #f0f0f0; border-radius: 8px;"> | |
<h4>π What this evaluates:</h4> | |
<ul> | |
<li><strong>Exact Match</strong>: Percentage of perfect predictions</li> | |
<li><strong>F1 Score</strong>: Token-level overlap between prediction and ground truth</li> | |
<li><strong>Confidence</strong>: Model's confidence in its predictions</li> | |
</ul> | |
</div> | |
""") | |
with gr.Column(scale=2): | |
gr.HTML("<h3>π Results</h3>") | |
results_summary = gr.Markdown( | |
value="Click 'π Start Evaluation' to begin...", | |
label="Evaluation Summary" | |
) | |
gr.HTML("<hr>") | |
with gr.Row(): | |
gr.HTML("<h3>π Detailed Results</h3>") | |
with gr.Row(): | |
detailed_results = gr.Dataframe( | |
label="Sample-by-Sample Results", | |
interactive=False, | |
wrap=True | |
) | |
with gr.Row(): | |
download_file = gr.File( | |
label="π₯ Download Complete Results (JSON)", | |
visible=False | |
) | |
# Event handlers | |
def handle_evaluation(num_samples): | |
summary, df, file_path = run_evaluation(num_samples) | |
if file_path and os.path.exists(file_path): | |
return summary, df, gr.update(visible=True, value=file_path) | |
else: | |
return summary, df, gr.update(visible=False) | |
evaluate_btn.click( | |
fn=handle_evaluation, | |
inputs=[num_samples], | |
outputs=[results_summary, detailed_results, download_file], | |
show_progress=True | |
) | |
# Footer | |
gr.HTML(""" | |
<div style="text-align: center; margin-top: 30px; padding: 20px; color: #666;"> | |
<p>π€ Powered by Hugging Face Transformers & Gradio</p> | |
<p>π CUAD Dataset by The Atticus Project</p> | |
</div> | |
""") | |
return demo | |
if __name__ == "__main__": | |
print("CUAD Model Evaluation with Gradio Interface") | |
print("=" * 50) | |
# Check if CUDA is available | |
if torch.cuda.is_available(): | |
print(f"β CUDA available: {torch.cuda.get_device_name(0)}") | |
else: | |
print("! Running on CPU") | |
# Create and launch Gradio interface | |
demo = create_gradio_interface() | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=True, | |
debug=True | |
) |