viettelpay-chatbot / src /evaluation /intent_classification.py
minhan6559's picture
Upload 73 files
60d1d13 verified
"""
Simplified Intent Classification Evaluation for ViettelPay AI Agent
Removed pattern-based generation, improved chunk mixing, and configurable conversations per chunk
"""
import json
import os
import sys
import argparse
import time
import random
from typing import Dict, List, Optional
from pathlib import Path
from collections import defaultdict, Counter
import pandas as pd
from tqdm import tqdm
import re
import numpy as np
# Load environment variables from .env file
from dotenv import load_dotenv
load_dotenv()
# Add the project root to Python path so we can import from src
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
# Import existing components
from src.evaluation.prompts import INTENT_CLASSIFICATION_CONVERSATION_GENERATION_PROMPT
from src.knowledge_base.viettel_knowledge_base import ViettelKnowledgeBase
from src.llm.llm_client import LLMClientFactory
from src.agent.nodes import classify_intent_node, ViettelPayState
from langchain_core.messages import HumanMessage
class IntentDatasetCreator:
"""Simplified intent classification dataset creator with two strategies"""
def __init__(
self, gemini_api_key: str, knowledge_base: ViettelKnowledgeBase = None
):
"""Initialize with Gemini API key and optional knowledge base"""
self.llm_client = LLMClientFactory.create_client(
"gemini", api_key=gemini_api_key, model="gemini-2.0-flash"
)
self.knowledge_base = knowledge_base
self.dataset = {
"conversations": {},
"generation_methods": {},
"intent_distribution": {},
"metadata": {
"total_conversations": 0,
"total_user_messages": 0,
"creation_timestamp": time.time(),
},
}
print("✅ IntentDatasetCreator initialized (simplified version)")
def generate_json_response(
self, prompt: str, max_retries: int = 3
) -> Optional[Dict]:
"""Generate response and parse as JSON with retries"""
for attempt in range(max_retries):
try:
response = self.llm_client.generate(prompt, temperature=0.1)
if response:
response_text = response.strip()
json_match = re.search(r"\{.*\}", response_text, re.DOTALL)
if json_match:
json_text = json_match.group()
return json.loads(json_text)
else:
return json.loads(response_text)
except json.JSONDecodeError as e:
print(f"⚠️ JSON parsing error (attempt {attempt + 1}): {e}")
if attempt == max_retries - 1:
print(f"❌ Failed to parse JSON after {max_retries} attempts")
except Exception as e:
print(f"⚠️ API error (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(2**attempt)
return None
def get_all_chunks(self) -> List[Dict]:
"""Get ALL chunks from ChromaDB vectorstore"""
print(f"📚 Retrieving ALL chunks from ChromaDB vectorstore...")
if not self.knowledge_base:
raise ValueError("Knowledge base not provided.")
try:
if (
not hasattr(self.knowledge_base, "chroma_retriever")
or not self.knowledge_base.chroma_retriever
):
raise ValueError("ChromaDB retriever not found in knowledge base")
vectorstore = self.knowledge_base.chroma_retriever.vectorstore
all_docs = vectorstore.get(include=["documents", "metadatas"])
documents = all_docs["documents"]
metadatas = all_docs["metadatas"]
all_chunks = []
seen_content_hashes = set()
for i, (content, metadata) in enumerate(zip(documents, metadatas)):
content_hash = hash(content[:300])
if (
content_hash not in seen_content_hashes
and len(content.strip()) > 100
):
chunk_info = {
"id": f"chunk_{len(all_chunks)}",
"content": content,
"metadata": metadata or {},
}
all_chunks.append(chunk_info)
seen_content_hashes.add(content_hash)
print(f"✅ Retrieved {len(all_chunks)} unique chunks from ChromaDB")
return all_chunks
except Exception as e:
print(f"❌ Error accessing ChromaDB: {e}")
return []
def generate_single_chunk_conversations(
self, chunk: Dict, num_conversations: int = 3
) -> List[Dict]:
"""Generate conversations from single chunk"""
content = chunk["content"]
generation_instruction = "Tạo cuộc hội thoại tập trung vào chủ đề chính của tài liệu. Bao gồm cả các intent phổ biến như greeting, unclear, human_request để tăng tính đa dạng"
prompt = INTENT_CLASSIFICATION_CONVERSATION_GENERATION_PROMPT.format(
num_conversations=num_conversations,
content=content,
generation_instruction=generation_instruction,
)
response_json = self.generate_json_response(prompt)
if response_json and "conversations" in response_json:
conversations = response_json["conversations"]
valid_conversations = []
for i, conversation in enumerate(conversations):
if "turns" in conversation and len(conversation["turns"]) >= 1:
valid_turns = []
for turn in conversation["turns"]:
if "user" in turn and "intent" in turn:
valid_turns.append(turn)
if valid_turns:
conv_obj = {
"id": f"single_{chunk['id']}_{i}",
"turns": valid_turns,
"generation_method": "single_chunk",
"source_chunks": [chunk["id"]],
"chunk_metadata": [chunk["metadata"]],
}
valid_conversations.append(conv_obj)
return valid_conversations
else:
print(f"⚠️ No valid conversations generated for chunk {chunk['id']}")
return []
def generate_multi_chunk_conversations(
self, chunks: List[Dict], num_conversations: int = 3
) -> List[Dict]:
"""Generate conversations from multiple chunks (2-3 chunks)"""
# Combine content from multiple chunks
combined_content = ""
for i, chunk in enumerate(chunks):
combined_content += f"\n\n--- Chủ đề {i+1} ---\n" + chunk["content"]
generation_instruction = f"Tạo cuộc hội thoại tự nhiên kết hợp {len(chunks)} chủ đề khác nhau. Người dùng có thể chuyển từ chủ đề này sang chủ đề khác. Đặc biệt bao gồm các intent như greeting, unclear, human_request để cuộc hội thoại thực tế hơn"
prompt = INTENT_CLASSIFICATION_CONVERSATION_GENERATION_PROMPT.format(
num_conversations=num_conversations,
content=combined_content,
generation_instruction=generation_instruction,
)
response_json = self.generate_json_response(prompt)
if response_json and "conversations" in response_json:
conversations = response_json["conversations"]
valid_conversations = []
for i, conversation in enumerate(conversations):
if "turns" in conversation and len(conversation["turns"]) >= 1:
valid_turns = []
for turn in conversation["turns"]:
if "user" in turn and "intent" in turn:
valid_turns.append(turn)
if valid_turns:
conv_obj = {
"id": f"multi_{'-'.join([c['id'] for c in chunks])}_{i}",
"turns": valid_turns,
"generation_method": "multi_chunk",
"source_chunks": [c["id"] for c in chunks],
"chunk_metadata": [c["metadata"] for c in chunks],
}
valid_conversations.append(conv_obj)
print(
f"✅ Generated {len(valid_conversations)} conversations for multi-chunk {[c['id'] for c in chunks]}"
)
return valid_conversations
else:
print(
f"⚠️ No valid conversations generated for chunks {[c['id'] for c in chunks]}"
)
return []
def create_intent_dataset(
self,
num_conversations_per_chunk: int = 3,
save_path: str = "evaluation_data/datasets/intent_classification/viettelpay_intent_dataset.json",
) -> Dict:
"""Create intent classification dataset using two strategies only"""
print(f"\n🚀 Creating intent classification dataset...")
print(f" Conversations per chunk: {num_conversations_per_chunk}")
# Step 1: Get all chunks
all_chunks = self.get_all_chunks()
if not all_chunks:
raise ValueError("No chunks found in knowledge base!")
total_chunks = len(all_chunks)
print(f"✅ Using all {total_chunks} chunks and shuffle them")
random.shuffle(all_chunks)
# Step 2: Split chunks for two strategies (60% single, 40% multi)
split_point = int(total_chunks * 0.6)
single_chunks = all_chunks[:split_point]
multi_chunks = all_chunks[split_point:]
print(f"📊 Distribution plan:")
print(
f" • Single chunk: {len(single_chunks)} chunks → ~{len(single_chunks) * num_conversations_per_chunk} conversations"
)
print(
f" • Multi chunk: {len(multi_chunks)} chunks → ~{len(multi_chunks) // 2.5 * num_conversations_per_chunk} conversations"
)
all_conversations = []
# Step 3: Generate single-chunk conversations
print(f"\n💬 Generating single-chunk conversations...")
for chunk in tqdm(single_chunks, desc="Single-chunk conversations"):
conversations = self.generate_single_chunk_conversations(
chunk, num_conversations_per_chunk
)
all_conversations.extend(conversations)
time.sleep(0.1)
# Step 4: Generate multi-chunk conversations (2-3 chunks randomly)
print(f"\n🔀 Generating multi-chunk conversations...")
random.shuffle(multi_chunks) # Randomize order
i = 0
while i < len(multi_chunks):
# Randomly choose to use 2 or 3 chunks
chunk_count = random.choice([2, 3])
chunk_group = multi_chunks[i : i + chunk_count]
# Only proceed if we have at least 2 chunks
if len(chunk_group) >= 2:
conversations = self.generate_multi_chunk_conversations(
chunk_group, num_conversations_per_chunk
)
all_conversations.extend(conversations)
time.sleep(0.1)
i += chunk_count
# Step 5: Track generation methods and intent distribution
method_stats = defaultdict(int)
intent_counts = Counter()
for conv in all_conversations:
method_stats[conv["generation_method"]] += 1
for turn in conv["turns"]:
intent_counts[turn["intent"]] += 1
# Step 6: Populate dataset structure
self.dataset["conversations"] = {conv["id"]: conv for conv in all_conversations}
self.dataset["generation_methods"] = dict(method_stats)
self.dataset["intent_distribution"] = dict(intent_counts)
# Step 7: Update metadata
total_user_messages = sum(len(conv["turns"]) for conv in all_conversations)
self.dataset["metadata"].update(
{
"total_conversations": len(all_conversations),
"total_user_messages": total_user_messages,
"chunks_used": total_chunks,
"conversations_per_chunk": num_conversations_per_chunk,
"generation_distribution": dict(method_stats),
"completion_timestamp": time.time(),
}
)
# Step 8: Save dataset
os.makedirs(
os.path.dirname(save_path) if os.path.dirname(save_path) else ".",
exist_ok=True,
)
with open(save_path, "w", encoding="utf-8") as f:
json.dump(self.dataset, f, ensure_ascii=False, indent=2)
print(f"\n✅ Intent classification dataset created successfully!")
print(f" 📁 Saved to: {save_path}")
print(f" 📊 Statistics:")
print(f" • Total conversations: {len(all_conversations)}")
print(f" • Total user messages: {total_user_messages}")
print(f" • Conversations per chunk: {num_conversations_per_chunk}")
print(f" • Generation methods: {dict(method_stats)}")
print(f" • Intent distribution: {dict(intent_counts)}")
return self.dataset
class IntentClassificationEvaluator:
"""Evaluator for intent classification performance with method-specific analysis"""
def __init__(self, dataset: Dict, llm_client):
"""Initialize evaluator with dataset and LLM client"""
self.dataset = dataset
self.llm_client = llm_client
# Define expected intents
self.expected_intents = [
"greeting",
"faq",
"error_help",
"procedure_guide",
"human_request",
"out_of_scope",
"unclear",
]
# Critical intents for business
self.critical_intents = ["error_help", "human_request"]
# Define flow mappings based on agent routing logic
self.script_based_intents = {
"greeting",
"out_of_scope",
"human_request",
"unclear",
}
self.knowledge_based_intents = {
"faq",
"error_help",
"procedure_guide",
}
def _get_intent_flow(self, intent: str) -> str:
"""Classify intent into flow type based on agent routing logic"""
if intent in self.script_based_intents:
return "script_based"
elif intent in self.knowledge_based_intents:
return "knowledge_based"
else:
return "unknown"
def _make_json_serializable(self, obj):
"""Convert numpy types to native Python types for JSON serialization"""
try:
import numpy as np
if isinstance(obj, dict):
return {k: self._make_json_serializable(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self._make_json_serializable(item) for item in obj]
elif isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return obj
except ImportError:
# If numpy is not available, just return the object as-is
if isinstance(obj, dict):
return {k: self._make_json_serializable(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self._make_json_serializable(item) for item in obj]
else:
return obj
def calculate_essential_metrics(
self, ground_truth: List[str], predictions: List[str]
) -> Dict:
"""Calculate only essential metrics: accuracy, macro, per-class"""
try:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
overall_accuracy = accuracy_score(ground_truth, predictions)
# Calculate macro metrics (equal weight per intent)
precision, recall, f1, support = precision_recall_fscore_support(
ground_truth, predictions, average="macro", zero_division=0
)
macro_metrics = {
"macro_precision": precision,
"macro_recall": recall,
"macro_f1": f1,
}
# Calculate per-class metrics
precision_per_class, recall_per_class, f1_per_class, support_per_class = (
precision_recall_fscore_support(
ground_truth, predictions, average=None, zero_division=0
)
)
# Get unique labels
unique_labels = sorted(list(set(ground_truth + predictions)))
per_class_metrics = {}
for i, label in enumerate(unique_labels):
if i < len(precision_per_class):
per_class_metrics[label] = {
"precision": float(precision_per_class[i]),
"recall": float(recall_per_class[i]),
"f1": float(f1_per_class[i]),
"support": int(
support_per_class[i] if i < len(support_per_class) else 0
),
}
# Calculate critical intent recall
critical_recall = {}
for intent in self.critical_intents:
if intent in per_class_metrics:
critical_recall[intent] = per_class_metrics[intent]["recall"]
return {
"overall_accuracy": float(overall_accuracy),
"macro_precision": float(macro_metrics["macro_precision"]),
"macro_recall": float(macro_metrics["macro_recall"]),
"macro_f1": float(macro_metrics["macro_f1"]),
"per_class_metrics": per_class_metrics,
"critical_intent_recall": {
k: float(v) for k, v in critical_recall.items()
},
}
except ImportError:
print("⚠️ scikit-learn not installed. Using basic accuracy only.")
overall_accuracy = sum(
1 for gt, pred in zip(ground_truth, predictions) if gt == pred
) / len(predictions)
return {"overall_accuracy": float(overall_accuracy)}
def evaluate_intent_classification(self) -> Dict:
"""Evaluate intent classification performance with method and flow breakdown"""
print(f"\n🎯 Running intent classification evaluation...")
conversations = self.dataset["conversations"]
# Initialize tracking
all_predictions = []
all_ground_truth = []
method_results = defaultdict(lambda: {"predictions": [], "ground_truth": []})
flow_results = defaultdict(lambda: {"predictions": [], "ground_truth": []})
conversation_results = {}
# Process each conversation
for conv_id, conv_data in tqdm(
conversations.items(), desc="Evaluating conversations"
):
generation_method = conv_data.get("generation_method", "unknown")
conversation_results[conv_id] = {
"turns": [],
"accuracy": 0,
"generation_method": generation_method,
}
correct_predictions = 0
total_turns = len(conv_data["turns"])
# Process each turn in the conversation
for turn_idx, turn in enumerate(conv_data["turns"]):
user_message = turn["user"]
ground_truth_intent = turn["intent"]
try:
# Create messages in the format expected by classify_intent_node
messages = [HumanMessage(content=user_message)]
# Create a mock state for the intent classification node
state = ViettelPayState(messages=messages)
# Use the classify_intent_node directly
result_state = classify_intent_node(state, self.llm_client)
predicted_intent = result_state.get("intent", "unclear")
# Track results
is_correct = predicted_intent == ground_truth_intent
if is_correct:
correct_predictions += 1
# Add to overall tracking
all_predictions.append(predicted_intent)
all_ground_truth.append(ground_truth_intent)
# Add to method-specific tracking
method_results[generation_method]["predictions"].append(
predicted_intent
)
method_results[generation_method]["ground_truth"].append(
ground_truth_intent
)
# Add to flow-specific tracking
ground_truth_flow = self._get_intent_flow(ground_truth_intent)
predicted_flow = self._get_intent_flow(predicted_intent)
flow_results[ground_truth_flow]["predictions"].append(
predicted_intent
)
flow_results[ground_truth_flow]["ground_truth"].append(
ground_truth_intent
)
conversation_results[conv_id]["turns"].append(
{
"turn": turn_idx + 1,
"user_message": user_message,
"ground_truth": ground_truth_intent,
"predicted": predicted_intent,
"correct": is_correct,
}
)
except Exception as e:
print(f"⚠️ Error processing turn {turn_idx} in {conv_id}: {e}")
# Use "unclear" as fallback prediction
all_predictions.append("unclear")
all_ground_truth.append(ground_truth_intent)
method_results[generation_method]["predictions"].append("unclear")
method_results[generation_method]["ground_truth"].append(
ground_truth_intent
)
# Add to flow-specific tracking (for errors)
ground_truth_flow = self._get_intent_flow(ground_truth_intent)
flow_results[ground_truth_flow]["predictions"].append("unclear")
flow_results[ground_truth_flow]["ground_truth"].append(
ground_truth_intent
)
# Calculate conversation accuracy
conversation_results[conv_id]["accuracy"] = float(
correct_predictions / total_turns if total_turns > 0 else 0
)
# Calculate overall metrics
overall_metrics = self.calculate_essential_metrics(
all_ground_truth, all_predictions
)
# Calculate method-specific metrics
method_metrics = {}
for method, method_data in method_results.items():
if method_data["predictions"]: # Ensure we have data
method_metrics[method] = self.calculate_essential_metrics(
method_data["ground_truth"], method_data["predictions"]
)
method_metrics[method]["total_messages"] = len(
method_data["predictions"]
)
# Calculate flow-specific metrics
flow_metrics = {}
for flow, flow_data in flow_results.items():
if flow_data["predictions"]: # Ensure we have data
flow_metrics[flow] = self.calculate_essential_metrics(
flow_data["ground_truth"], flow_data["predictions"]
)
flow_metrics[flow]["total_messages"] = len(flow_data["predictions"])
results = {
"overall_metrics": overall_metrics,
"method_specific_metrics": method_metrics,
"flow_specific_metrics": flow_metrics,
"conversation_results": conversation_results,
"intent_distribution": {
"ground_truth": dict(Counter(all_ground_truth)),
"predicted": dict(Counter(all_predictions)),
},
"generation_methods": self.dataset.get("generation_methods", {}),
}
# Make sure all values are JSON serializable
results = self._make_json_serializable(results)
return results
def print_evaluation_results(self, results: Dict):
"""Print comprehensive evaluation results"""
print(f"\n🎯 INTENT CLASSIFICATION EVALUATION RESULTS")
print("=" * 60)
# Overall performance
overall = results["overall_metrics"]
print(f"\n📊 Overall Performance:")
print(f" Accuracy: {overall['overall_accuracy']:.3f}")
if "macro_precision" in overall:
print(f" Macro Precision: {overall['macro_precision']:.3f}")
print(f" Macro Recall: {overall['macro_recall']:.3f}")
print(f" Macro F1: {overall['macro_f1']:.3f}")
# Per-class performance
if "per_class_metrics" in overall:
print(f"\n📋 Per-Class Performance:")
print(
f"{'Intent':<15} {'Precision':<10} {'Recall':<10} {'F1':<10} {'Support':<10}"
)
print("-" * 65)
per_class = overall["per_class_metrics"]
for intent in self.expected_intents:
if intent in per_class:
metrics = per_class[intent]
print(
f"{intent:<15} {metrics['precision']:<10.3f} {metrics['recall']:<10.3f} {metrics['f1']:<10.3f} {metrics['support']:<10}"
)
# Critical intents performance
if "critical_intent_recall" in overall:
print(f"\n🚨 Critical Intent Performance:")
for intent, recall in overall["critical_intent_recall"].items():
status = "✅" if recall >= 0.85 else "⚠️" if recall >= 0.75 else "❌"
print(f" {status} {intent}: Recall = {recall:.3f}")
# Method-specific performance
print(f"\n🔄 Performance by Generation Method:")
method_metrics = results["method_specific_metrics"]
if method_metrics:
print(f"{'Method':<20} {'Accuracy':<10} {'Macro F1':<10} {'Messages':<10}")
print("-" * 55)
for method, metrics in method_metrics.items():
accuracy = metrics["overall_accuracy"]
macro_f1 = metrics.get("macro_f1", 0)
total_msgs = metrics["total_messages"]
print(
f"{method:<20} {accuracy:<10.3f} {macro_f1:<10.3f} {total_msgs:<10}"
)
# Flow-specific performance
print(f"\n🔀 Performance by Agent Flow:")
flow_metrics = results["flow_specific_metrics"]
if flow_metrics:
print(
f"{'Flow Type':<20} {'Accuracy':<10} {'Macro F1':<10} {'Messages':<10}"
)
print("-" * 55)
for flow, metrics in flow_metrics.items():
accuracy = metrics["overall_accuracy"]
macro_f1 = metrics.get("macro_f1", 0)
total_msgs = metrics["total_messages"]
flow_display = f"{flow}_flow"
print(
f"{flow_display:<20} {accuracy:<10.3f} {macro_f1:<10.3f} {total_msgs:<10}"
)
# Intent distribution comparison
print(f"\n📈 Intent Distribution:")
gt_dist = results["intent_distribution"]["ground_truth"]
pred_dist = results["intent_distribution"]["predicted"]
print(f"{'Intent':<15} {'Ground Truth':<15} {'Predicted':<15}")
print("-" * 50)
all_intents = set(list(gt_dist.keys()) + list(pred_dist.keys()))
for intent in sorted(all_intents):
gt_count = gt_dist.get(intent, 0)
pred_count = pred_dist.get(intent, 0)
print(f"{intent:<15} {gt_count:<15} {pred_count:<15}")
# Method insights
print(f"\n💡 Method-Specific Insights:")
if method_metrics:
method_accuracies = {
method: metrics["overall_accuracy"]
for method, metrics in method_metrics.items()
}
best_method = max(
method_accuracies.keys(), key=lambda k: method_accuracies[k]
)
worst_method = min(
method_accuracies.keys(), key=lambda k: method_accuracies[k]
)
print(
f" • Best performing method: {best_method} ({method_accuracies[best_method]:.3f})"
)
print(
f" • Most challenging method: {worst_method} ({method_accuracies[worst_method]:.3f})"
)
print(
f" • Performance gap: {method_accuracies[best_method] - method_accuracies[worst_method]:.3f}"
)
# Flow insights
print(f"\n🔀 Flow-Specific Insights:")
if flow_metrics:
flow_accuracies = {
flow: metrics["overall_accuracy"]
for flow, metrics in flow_metrics.items()
}
if len(flow_accuracies) >= 2:
best_flow = max(
flow_accuracies.keys(), key=lambda k: flow_accuracies[k]
)
worst_flow = min(
flow_accuracies.keys(), key=lambda k: flow_accuracies[k]
)
print(
f" • Best performing flow: {best_flow} ({flow_accuracies[best_flow]:.3f})"
)
print(
f" • Most challenging flow: {worst_flow} ({flow_accuracies[worst_flow]:.3f})"
)
print(
f" • Flow performance gap: {flow_accuracies[best_flow] - flow_accuracies[worst_flow]:.3f}"
)
# Provide interpretation
if (
"script_based" in flow_accuracies
and "knowledge_based" in flow_accuracies
):
script_acc = flow_accuracies["script_based"]
kb_acc = flow_accuracies["knowledge_based"]
if script_acc > kb_acc:
print(
f" • Script-based intents are easier to classify ({script_acc:.3f} vs {kb_acc:.3f})"
)
elif kb_acc > script_acc:
print(
f" • Knowledge-based intents are easier to classify ({kb_acc:.3f} vs {script_acc:.3f})"
)
else:
print(
f" • Both flows perform similarly ({script_acc:.3f} vs {kb_acc:.3f})"
)
else:
for flow, accuracy in flow_accuracies.items():
print(f" • {flow} flow accuracy: {accuracy:.3f}")
# Success criteria check
print(f"\n✅ Success Criteria Check:")
accuracy = overall["overall_accuracy"]
if accuracy >= 0.80:
print(f" 🎉 GOOD: Overall accuracy {accuracy:.3f} >= 0.80")
elif accuracy >= 0.75:
print(f" ⚠️ OKAY: Overall accuracy {accuracy:.3f} >= 0.75")
else:
print(f" ❌ NEEDS WORK: Overall accuracy {accuracy:.3f} < 0.75")
def main():
"""Main function for simplified intent classification evaluation"""
parser = argparse.ArgumentParser(
description="Simplified ViettelPay Intent Classification Evaluation"
)
parser.add_argument(
"--mode",
choices=["create", "evaluate", "full"],
default="full",
help="Mode: create dataset, evaluate, or full pipeline",
)
parser.add_argument(
"--dataset-path",
default="evaluation_data/datasets/intent_classification/viettelpay_intent_dataset.json",
help="Path to intent dataset",
)
parser.add_argument(
"--results-path",
default="evaluation_data/results/intent_classification/viettelpay_intent_results.json",
help="Path to save evaluation results",
)
parser.add_argument(
"--conversations-per-chunk",
type=int,
default=3,
help="Number of conversations per chunk (default: 3)",
)
parser.add_argument(
"--knowledge-base-path",
default="./knowledge_base",
help="Path to knowledge base",
)
args = parser.parse_args()
# Configuration
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if not GEMINI_API_KEY:
print("❌ Please set GEMINI_API_KEY environment variable")
return
try:
# Initialize components based on mode
kb = None
if args.mode in ["create", "full"]:
# Initialize knowledge base only if creating dataset
print("🔧 Initializing ViettelPay knowledge base...")
kb = ViettelKnowledgeBase()
if not kb.load_knowledge_base(args.knowledge_base_path):
print(
"❌ Failed to load knowledge base. Please run build_database_script.py first."
)
return
# Step 1: Create dataset if requested
if args.mode in ["create", "full"]:
print(f"\n🎯 Creating simplified intent classification dataset...")
creator = IntentDatasetCreator(GEMINI_API_KEY, kb)
dataset = creator.create_intent_dataset(
num_conversations_per_chunk=args.conversations_per_chunk,
save_path=args.dataset_path,
)
# Step 2: Evaluate if requested
if args.mode in ["evaluate", "full"]:
print(f"\n📊 Evaluating intent classification...")
# Load dataset if not created in this run
if args.mode == "evaluate":
if not os.path.exists(args.dataset_path):
print(f"❌ Dataset not found: {args.dataset_path}")
return
with open(args.dataset_path, "r", encoding="utf-8") as f:
dataset = json.load(f)
# Initialize LLM client for intent classification
print("🤖 Initializing LLM client for intent classification...")
llm_client = LLMClientFactory.create_client(
"gemini", api_key=GEMINI_API_KEY, model="gemini-2.0-flash"
)
# Run evaluation
evaluator = IntentClassificationEvaluator(dataset, llm_client)
results = evaluator.evaluate_intent_classification()
evaluator.print_evaluation_results(results)
# Save results
if args.results_path:
with open(args.results_path, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"\n💾 Results saved to: {args.results_path}")
print(f"\n✅ Intent classification evaluation completed successfully!")
print(f"\n💡 Summary improvements made:")
print(f" • Removed pattern-based generation for simplicity")
print(f" • Added configurable conversations-per-chunk (default: 3)")
print(f" • Improved chunk mixing (random 2-3 chunks)")
print(f" • Enhanced prompts to include non-topic intents")
print(f" • Added flow-specific analysis (script-based vs knowledge-based)")
except Exception as e:
print(f"❌ Error in main execution: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()