23RAG7 / evaluation.py
cb1716pics's picture
Upload 2 files
75c991a verified
raw
history blame
4.9 kB
import numpy as np
from sklearn.metrics import mean_squared_error, roc_auc_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from data_processing import load_query_dataset
global ground_truth_answer, ground_truth_metrics
ground_truth_answer = ''
ground_truth_metrics = {}
# Step 1: Helper function to compute cosine similarity
def compute_cosine_similarity(text1, text2):
if not text1 or not text2: # Check for empty or None values
print("Error: One or both input texts are empty. Returning similarity as 0.")
return 0.0
vectorizer = TfidfVectorizer(stop_words="english")
try:
vectors = vectorizer.fit_transform([text1, text2])
similarity = cosine_similarity(vectors[0], vectors[1])[0][0]
return similarity
except ValueError as e:
print(f"Error in vectorization: {e}. Returning similarity as 0.")
return 0.0
# Step 2: Metric 1 - Context Relevance
def context_relevance(question, relevant_documents):
# combined_docs = " ".join([doc.page_content for doc in relevant_documents])
combined_docs = " ".join([doc for doc in relevant_documents])
return compute_cosine_similarity(question, combined_docs)
# Step 3: Metric 2 - Context Utilization
def context_utilization(response, relevant_documents):
#combined_docs = " ".join([doc.page_content for doc in relevant_documents])
combined_docs = " ".join([doc for doc in relevant_documents])
return compute_cosine_similarity(response, combined_docs)
# Step 4: Metric 3 - Completeness
def completeness(response, ground_truth_answer):
return compute_cosine_similarity(response, ground_truth_answer)
# Step 5: Metric 4 - Adherence
def adherence(response, relevant_documents):
#combined_docs = " ".join([doc.page_content for doc in relevant_documents])
combined_docs = " ".join([doc for doc in relevant_documents])
response_tokens = set(response.split())
relevant_tokens = set(combined_docs.split())
supported_tokens = response_tokens.intersection(relevant_tokens)
return len(supported_tokens) / len(response_tokens) >= 0.5
# Step 6: Compute RMSE for metrics
def compute_rmse(predicted_values, ground_truth_values):
predicted_ = [float(v) for v in predicted_values.values()]
ground_truth_ = [float(v) if isinstance(v, (int, float)) else 0.75 if v is True else 0.25 for v in ground_truth_values.values()]
return np.sqrt(mean_squared_error(ground_truth_, predicted_))
def calculate_metrics(question, q_dataset, response, docs, time_taken):
data = load_query_dataset(q_dataset)
ground_truth_answer, ground_truth_metrics = retrieve_ground_truths(question, data) # Store the ground truth answer
# Ensure ground_truth_answer is not empty before proceeding
if ground_truth_answer is None:
ground_truth_answer = "" # Default to an empty string if no ground truth is found
# Predicted metrics
predicted_metrics = {
"context_relevance": context_relevance(question, docs),
"context_utilization": context_utilization(response, docs),
"completeness": completeness(response, ground_truth_answer),
"adherence": adherence(response, docs),
}
rmse = compute_rmse(predicted_metrics, ground_truth_metrics)
metrics = {
"RMSE": rmse,
"predicted_metrics":predicted_metrics,
"response_time": time_taken,
"ground_truth": ground_truth_answer,
"RAG_model_response": response,
}
return metrics
def retrieve_ground_truths(question, dataset):
for split_name, instances in dataset.items():
print(f"Processing {split_name} split")
for instance in instances:
#if instance['question'] == question:
if is_similar(instance['question'], question):
instance_id = instance['id']
ground_truth = instance['response']
ground_truth_metrics_ = {
"context_relevance": instance['relevance_score'],
"context_utilization": instance['utilization_score'],
"completeness": instance['completeness_score'],
"adherence": instance['adherence_score']
}
print(f"Match found in {split_name} split!")
print(f"ID: {instance_id}, Response: {ground_truth}")
return ground_truth , ground_truth_metrics_ # Return ground truth response immediately
return None, None
def is_similar(question1, question2, threshold=0.85):
vectorizer = TfidfVectorizer()
vectors = vectorizer.fit_transform([question1, question2])
similarity = cosine_similarity(vectors[0], vectors[1])[0][0]
return similarity >= threshold