classifieur / process.py
simondh's picture
new endpoint
8bef8d4
import logging
import time
import traceback
import asyncio
from sklearn.feature_extraction.text import TfidfVectorizer
from typing import Optional, List, Dict, Any, Tuple, Union
import pandas as pd
from pathlib import Path
import json
from classifiers import TFIDFClassifier, LLMClassifier
from utils import load_data, validate_results, get_sample_texts
from client import get_client
from prompts import VALIDATION_ANALYSIS_PROMPT, CATEGORY_IMPROVEMENT_PROMPT
def update_api_key(api_key: str) -> Tuple[bool, str]:
"""Update the OpenAI API key"""
from client import initialize_client
return initialize_client(api_key)
async def process_file_async(
file: Union[str, Path],
text_columns: List[str],
categories: Optional[str],
classifier_type: str,
show_explanations: bool
) -> Tuple[Optional[pd.DataFrame], Optional[str]]:
"""Async version of process_file"""
# Initialize result_df and validation_report
result_df: Optional[pd.DataFrame] = None
validation_report: Optional[str] = None
try:
# Load data from file
if isinstance(file, str):
df: pd.DataFrame = load_data(file)
else:
df: pd.DataFrame = load_data(file.name)
if not text_columns:
return None, "Please select at least one text column"
# Check if all selected columns exist
missing_columns: List[str] = [col for col in text_columns if col not in df.columns]
if missing_columns:
return (
None,
f"Columns not found in the file: {', '.join(missing_columns)}. Available columns: {', '.join(df.columns)}",
)
# Combine text from selected columns
texts: List[str] = []
for _, row in df.iterrows():
combined_text: str = " ".join(str(row[col]) for col in text_columns)
texts.append(combined_text)
# Parse categories if provided
category_list: List[str] = []
if categories:
category_list = [cat.strip() for cat in categories.split(",")]
# Select classifier based on data size and user choice
num_texts: int = len(texts)
# If no specific model is chosen, select the most appropriate one
if classifier_type == "auto":
if num_texts <= 500:
classifier_type = "gpt4"
elif num_texts <= 1000:
classifier_type = "gpt35"
elif num_texts <= 5000:
classifier_type = "hybrid"
else:
classifier_type = "tfidf"
# Get the client instance
client = get_client()
# Initialize appropriate classifier
if classifier_type == "tfidf":
classifier: TFIDFClassifier = TFIDFClassifier()
results: List[Dict[str, Any]] = classifier.classify(texts, category_list)
elif classifier_type in ["gpt35", "gpt4"]:
if client is None:
return (
None,
"Erreur : Le client API n'est pas initialisé. Veuillez configurer une clé API valide dans l'onglet 'Setup'.",
)
model: str = "gpt-3.5-turbo" if classifier_type == "gpt35" else "gpt-4"
classifier: LLMClassifier = LLMClassifier(client=client, model=model)
results: List[Dict[str, Any]] = await classifier.classify_async(texts, category_list)
else: # hybrid
if client is None:
return (
None,
"Erreur : Le client API n'est pas initialisé. Veuillez configurer une clé API valide dans l'onglet 'Setup'.",
)
# First pass with TF-IDF
tfidf_classifier: TFIDFClassifier = TFIDFClassifier()
tfidf_results: List[Dict[str, Any]] = tfidf_classifier.classify(texts, category_list)
# Second pass with LLM for low confidence results
llm_classifier: LLMClassifier = LLMClassifier(client=client, model="gpt-3.5-turbo")
results: List[Optional[Dict[str, Any]]] = []
low_confidence_texts: List[str] = []
low_confidence_indices: List[int] = []
for i, (text, tfidf_result) in enumerate(zip(texts, tfidf_results)):
if tfidf_result["confidence"] < 70: # If confidence is below 70%
low_confidence_texts.append(text)
low_confidence_indices.append(i)
results.append(None) # Placeholder
else:
results.append(tfidf_result)
if low_confidence_texts:
llm_results: List[Dict[str, Any]] = await llm_classifier.classify_async(
low_confidence_texts, category_list
)
for idx, llm_result in zip(low_confidence_indices, llm_results):
results[idx] = llm_result
# Create results dataframe
result_df = df.copy()
result_df["Category"] = [r["category"] for r in results]
result_df["Confidence"] = [r["confidence"] for r in results]
if show_explanations:
result_df["Explanation"] = [r["explanation"] for r in results]
# Validate results using LLM
validation_report = validate_results(result_df, text_columns, client)
return result_df, validation_report
except Exception as e:
error_traceback: str = traceback.format_exc()
return None, f"Error: {str(e)}\n{error_traceback}"
def process_file(
file: Union[str, Path],
text_columns: List[str],
categories: Optional[str],
classifier_type: str,
show_explanations: bool
) -> Tuple[Optional[pd.DataFrame], Optional[str]]:
"""Synchronous wrapper for process_file_async"""
return asyncio.run(process_file_async(file, text_columns, categories, classifier_type, show_explanations))
def export_results(df: pd.DataFrame, format_type: str) -> Optional[str]:
"""Export results to a file and return the file path for download"""
if df is None:
return None
# Create a temporary file
import tempfile
import os
# Create a temporary directory if it doesn't exist
temp_dir: str = "temp_exports"
os.makedirs(temp_dir, exist_ok=True)
# Generate a unique filename
timestamp: str = time.strftime("%Y%m%d-%H%M%S")
filename: str = f"classification_results_{timestamp}"
if format_type == "excel":
file_path: str = os.path.join(temp_dir, f"{filename}.xlsx")
df.to_excel(file_path, index=False)
else:
file_path: str = os.path.join(temp_dir, f"{filename}.csv")
df.to_csv(file_path, index=False)
return file_path
async def improve_classification(
df: pd.DataFrame,
validation_report: str,
text_columns: List[str],
categories: str,
classifier_type: str,
show_explanations: bool,
file: Union[str, Path]
) -> Tuple[Optional[pd.DataFrame], Optional[str], bool, List[str]]:
"""
Improve classification based on validation report
Args:
df (pd.DataFrame): Current classification results
validation_report (str): Validation report from previous classification
text_columns (List[str]): List of text column names
categories (str): Comma-separated list of categories
classifier_type (str): Type of classifier to use
show_explanations (bool): Whether to show explanations
file (Union[str, Path]): Path to the input file
Returns:
Tuple[Optional[pd.DataFrame], Optional[str], bool, List[str]]:
- Improved dataframe
- New validation report
- Whether improvement was successful
- Updated categories
"""
if df is None or not validation_report:
return None, validation_report, False, []
try:
client = get_client()
if not client:
return None, "Error: API client not initialized", False, []
# Extract insights from validation report
prompt = VALIDATION_ANALYSIS_PROMPT.format(
validation_report=validation_report,
current_categories=categories,
)
response = await asyncio.get_event_loop().run_in_executor(
None,
lambda: client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0,
max_tokens=300,
)
)
improvements = json.loads(response.choices[0].message.content.strip())
current_categories = [cat.strip() for cat in categories.split(",")]
# If new categories are needed, suggest them based on the data
if improvements.get("new_categories_needed", False):
# Get sample texts for category suggestion
sample_texts = get_sample_texts(df, text_columns, sample_size=10)
category_prompt = CATEGORY_IMPROVEMENT_PROMPT.format(
current_categories=", ".join(current_categories),
analysis=improvements.get("analysis", ""),
sample_texts="\n---\n".join(sample_texts)
)
category_response = await asyncio.get_event_loop().run_in_executor(
None,
lambda: client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": category_prompt}],
temperature=0,
max_tokens=100,
)
)
new_categories = [
cat.strip()
for cat in category_response.choices[0].message.content.strip().split(",")
]
# Combine current and new categories
all_categories = current_categories + new_categories
categories = ",".join(all_categories)
# Process with improved parameters
improved_df, new_validation = await process_file_async(
file,
text_columns,
categories,
classifier_type,
show_explanations,
)
return improved_df, new_validation, True, all_categories if improvements.get("new_categories_needed", False) else current_categories
except Exception as e:
print(f"Error in improvement process: {str(e)}")
return df, validation_report, False, current_categories