Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.cluster import KMeans | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import random | |
| import json | |
| class BaseClassifier: | |
| """Base class for text classifiers""" | |
| def __init__(self): | |
| pass | |
| def classify(self, texts, categories=None): | |
| """ | |
| Classify a list of texts into categories | |
| Args: | |
| texts (list): List of text strings to classify | |
| categories (list, optional): List of category names. If None, categories will be auto-detected | |
| Returns: | |
| list: List of classification results with categories, confidence scores, and explanations | |
| """ | |
| raise NotImplementedError("Subclasses must implement this method") | |
| def _generate_default_categories(self, texts, num_clusters=5): | |
| """ | |
| Generate default categories based on text clustering | |
| Args: | |
| texts (list): List of text strings | |
| num_clusters (int): Number of clusters to generate | |
| Returns: | |
| list: List of category names | |
| """ | |
| # Simple implementation - in real system this would be more sophisticated | |
| default_categories = [f"Category {i+1}" for i in range(num_clusters)] | |
| return default_categories | |
| class TFIDFClassifier(BaseClassifier): | |
| """Classifier using TF-IDF and clustering for fast classification""" | |
| def __init__(self): | |
| super().__init__() | |
| self.vectorizer = TfidfVectorizer( | |
| max_features=1000, | |
| stop_words='english', | |
| ngram_range=(1, 2) | |
| ) | |
| self.model = None | |
| self.feature_names = None | |
| self.categories = None | |
| self.centroids = None | |
| def classify(self, texts, categories=None): | |
| """Classify texts using TF-IDF and clustering""" | |
| # Vectorize the texts | |
| X = self.vectorizer.fit_transform(texts) | |
| self.feature_names = self.vectorizer.get_feature_names_out() | |
| # Auto-detect categories if not provided | |
| if not categories: | |
| num_clusters = min(5, len(texts)) # Don't create more clusters than texts | |
| self.categories = self._generate_default_categories(texts, num_clusters) | |
| else: | |
| self.categories = categories | |
| num_clusters = len(categories) | |
| # Cluster the texts | |
| self.model = KMeans(n_clusters=num_clusters, random_state=42) | |
| clusters = self.model.fit_predict(X) | |
| self.centroids = self.model.cluster_centers_ | |
| # Calculate distances to centroids for confidence | |
| distances = self._calculate_distances(X) | |
| # Prepare results | |
| results = [] | |
| for i, text in enumerate(texts): | |
| cluster_idx = clusters[i] | |
| # Calculate confidence (inverse of distance, normalized) | |
| confidence = self._calculate_confidence(distances[i]) | |
| # Create explanation | |
| explanation = self._generate_explanation(X[i], cluster_idx) | |
| results.append({ | |
| "category": self.categories[cluster_idx], | |
| "confidence": confidence, | |
| "explanation": explanation | |
| }) | |
| return results | |
| def _calculate_distances(self, X): | |
| """Calculate distances from each point to each centroid""" | |
| return np.sqrt(((X.toarray()[:, np.newaxis, :] - self.centroids[np.newaxis, :, :]) ** 2).sum(axis=2)) | |
| def _calculate_confidence(self, distances): | |
| """Convert distances to confidence scores (0-100)""" | |
| min_dist = np.min(distances) | |
| max_dist = np.max(distances) | |
| # Normalize and invert (smaller distance = higher confidence) | |
| if max_dist == min_dist: | |
| return 70 # Default mid-range confidence when all distances are equal | |
| normalized_dist = (distances - min_dist) / (max_dist - min_dist) | |
| min_normalized = np.min(normalized_dist) | |
| # Invert and scale to 50-100 range (TF-IDF is never 100% confident) | |
| confidence = 100 - (min_normalized * 50) | |
| return round(confidence, 1) | |
| def _generate_explanation(self, text_vector, cluster_idx): | |
| """Generate an explanation for the classification""" | |
| # Get the most important features for this cluster | |
| centroid = self.centroids[cluster_idx] | |
| # Get indices of top features for this text | |
| text_array = text_vector.toarray()[0] | |
| top_indices = text_array.argsort()[-5:][::-1] | |
| # Get the feature names for these indices | |
| top_features = [self.feature_names[i] for i in top_indices if text_array[i] > 0] | |
| if not top_features: | |
| return "No significant features identified for this classification." | |
| explanation = f"Classification based on key terms: {', '.join(top_features)}" | |
| return explanation | |
| class LLMClassifier(BaseClassifier): | |
| """Classifier using a Large Language Model for more accurate but slower classification""" | |
| def __init__(self, client, model="gpt-3.5-turbo"): | |
| super().__init__() | |
| self.client = client | |
| self.model = model | |
| def classify(self, texts, categories=None): | |
| """Classify texts using an LLM""" | |
| if not categories: | |
| # First, use LLM to generate appropriate categories | |
| categories = self._suggest_categories(texts) | |
| results = [] | |
| for text in texts: | |
| # Classify each text individually | |
| result = self._classify_text(text, categories) | |
| results.append(result) | |
| return results | |
| def _suggest_categories(self, texts, sample_size=20): | |
| """Use LLM to suggest appropriate categories for the dataset""" | |
| # Take a sample of texts to avoid token limitations | |
| if len(texts) > sample_size: | |
| sample_texts = random.sample(texts, sample_size) | |
| else: | |
| sample_texts = texts | |
| prompt = """ | |
| I have a collection of texts that I need to classify into categories. Here are some examples: | |
| {} | |
| Based on these examples, suggest up 2 to 5 appropriate categories for classification. | |
| Return your answer as a comma-separated list of category names only. | |
| """.format("\n---\n".join(sample_texts)) | |
| try: | |
| response = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.2, | |
| max_tokens=100 | |
| ) | |
| # Parse response to get categories | |
| categories_text = response.choices[0].message.content.strip() | |
| categories = [cat.strip() for cat in categories_text.split(",")] | |
| return categories | |
| except Exception as e: | |
| # Fallback to default categories on error | |
| print(f"Error suggesting categories: {str(e)}") | |
| return self._generate_default_categories(texts) | |
| def _classify_text(self, text, categories): | |
| """Use LLM to classify a single text""" | |
| categories_str = ", ".join(categories) | |
| prompt = f""" | |
| Classify the following text into one of these categories: {categories_str} | |
| Text: {text} | |
| Return your answer in JSON format with these fields: | |
| - category: the chosen category from the list | |
| - confidence: a value between 0 and 100 indicating your confidence in this classification (as a percentage) | |
| - explanation: a brief explanation of why this category was chosen (1-2 sentences) | |
| JSON response: | |
| """ | |
| try: | |
| response = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0, | |
| max_tokens=200 | |
| ) | |
| # Parse JSON response | |
| response_text = response.choices[0].message.content.strip() | |
| result = json.loads(response_text) | |
| # Ensure all required fields are present | |
| if not all(k in result for k in ["category", "confidence", "explanation"]): | |
| raise ValueError("Missing required fields in LLM response") | |
| # Validate category is in the list | |
| if result["category"] not in categories: | |
| result["category"] = categories[0] # Default to first category if invalid | |
| # Validate confidence is a number between 0 and 100 | |
| try: | |
| result["confidence"] = float(result["confidence"]) | |
| if not 0 <= result["confidence"] <= 100: | |
| result["confidence"] = 50 | |
| except: | |
| result["confidence"] = 50 | |
| return result | |
| except json.JSONDecodeError: | |
| # Fall back to simple parsing if JSON fails | |
| category = categories[0] # Default | |
| for cat in categories: | |
| if cat.lower() in response_text.lower(): | |
| category = cat | |
| break | |
| return { | |
| "category": category, | |
| "confidence": 50, | |
| "explanation": f"Classification based on language model analysis. (Note: Structured response parsing failed)" | |
| } | |