import numpy as np from typing import List, Tuple, Dict, Any, TYPE_CHECKING from sklearn.metrics.pairwise import cosine_similarity from logger_config import config_logger logger = config_logger(__name__) if TYPE_CHECKING: from tf_data_pipeline import TFDataPipeline class ResponseQualityChecker: """ Enhanced quality checking that calculates: - Relevance between query & responses - Diversity among top responses - Response length scoring - Confidence determination based on multiple thresholds """ def __init__( self, data_pipeline: 'TFDataPipeline', confidence_threshold: float = 0.45, diversity_threshold: float = 0.15, min_response_length: int = 5, similarity_cap: float = 0.85, ): """ Args: data_pipeline: Reference to TFDataPipeline for encoding confidence_threshold: Minimum top_score for a 'confident' result diversity_threshold: Minimum required diversity among top responses min_response_length: Minimum words for a decent response similarity_cap: Cap on pairwise similarity for diversity calc """ self.confidence_threshold = confidence_threshold self.diversity_threshold = diversity_threshold self.min_response_length = min_response_length self.similarity_cap = similarity_cap self.data_pipeline = data_pipeline # Additional thresholds for more refined checks self.thresholds = { 'relevance': 0.30, # Slightly relaxed 'length_score': 0.80, # Stricter length requirement 'score_gap': 0.05 # Gap between top scores } def check_response_quality( self, query: str, responses: List[Tuple[str, float]] ) -> Dict[str, Any]: """ Evaluate the quality of a set of ranked responses for a given query. Args: query: The user's original query responses: List of (response_text, score) sorted by descending score Returns: Dictionary of metrics, including 'is_confident' and others """ if not responses: return { 'response_diversity': 0.0, 'query_response_relevance': 0.0, 'is_confident': False, 'top_score': 0.0, 'response_length_score': 0.0, 'top_3_score_gap': 0.0 } # 1) Calculate relevant metrics metrics = {} metrics['response_diversity'] = self.calculate_diversity(responses) metrics['query_response_relevance'] = self.calculate_relevance(query, responses) metrics['response_length_score'] = self._average_length_score(responses) metrics['top_score'] = responses[0][1] metrics['top_3_score_gap'] = self._calculate_score_gap([s for _, s in responses], top_n=3) # 2) Determine confidence metrics['is_confident'] = self._determine_confidence(metrics) logger.info(f"Quality metrics: {metrics}") return metrics def calculate_relevance(self, query: str, responses: List[Tuple[str, float]]) -> float: """ Compute an overall 'relevance' metric between the query and the top responses. Uses an exponential transform on the similarity to penalize weaker matches. """ if not responses: return 0.0 # Encode query and responses query_emb = self.data_pipeline.encode_query(query) resp_texts = [r for r, _ in responses] resp_embs = self.data_pipeline.encode_responses(resp_texts) # Normalize embeddings query_emb = query_emb / (np.linalg.norm(query_emb) + 1e-12) resp_norms = np.linalg.norm(resp_embs, axis=1, keepdims=True) + 1e-12 resp_embs = resp_embs / resp_norms # Cosine similarity sims = cosine_similarity([query_emb], resp_embs)[0] # Exponential transform: higher sims remain close to 1, lower sims drop quickly sims = np.exp(sims - 1.0) # Weighted average: give heavier weighting to higher-ranked items weights = np.exp(-np.arange(len(sims)) / 2.0) weighted_avg = np.average(sims, weights=weights) return float(weighted_avg) def calculate_diversity(self, responses: List[Tuple[str, float]]) -> float: """ Calculate how 'different' the top responses are from each other. Diversity = 1 - avg_cosine_similarity (capped). """ if len(responses) < 2: return 1.0 # Single response is trivially 'unique' resp_texts = [r for r, _ in responses] embs = self.data_pipeline.encode_responses(resp_texts) # Pairwise similarity sim_matrix = cosine_similarity(embs, embs) np.fill_diagonal(sim_matrix, 0.0) # Cap similarity to avoid outliers sim_matrix = np.minimum(sim_matrix, self.similarity_cap) # Mean off-diagonal similarity sum_sims = np.sum(sim_matrix) num_pairs = len(resp_texts) * (len(resp_texts) - 1) avg_sim = sum_sims / num_pairs if num_pairs > 0 else 0.0 # Invert to get diversity return 1.0 - avg_sim def _determine_confidence(self, metrics: Dict[str, float]) -> bool: """ Decide if we're 'confident' based on multiple metric thresholds. """ primary_conditions = [ metrics['top_score'] >= self.confidence_threshold, metrics['response_diversity'] >= self.diversity_threshold, metrics['response_length_score'] >= self.thresholds['length_score'] ] secondary_conditions = [ metrics['query_response_relevance'] >= self.thresholds['relevance'], metrics['top_3_score_gap'] >= self.thresholds['score_gap'], metrics['top_score'] >= (self.confidence_threshold + 0.05) # Extra buffer ] # Must pass all primary checks, and at least 2 of the 3 secondary return all(primary_conditions) and (sum(secondary_conditions) >= 2) def _average_length_score(self, responses: List[Tuple[str, float]]) -> float: """ Compute an average length score across all responses. """ length_scores = [] for response, _ in responses: length_scores.append(self._length_score(response)) return float(np.mean(length_scores)) if length_scores else 0.0 def _length_score(self, text: str) -> float: """ Calculate how well the text meets our length requirement. Scores 1.0 if text is >= min_response_length and not too long, else it scales down. """ words = len(text.split()) if words < self.min_response_length: return words / float(self.min_response_length) elif words > 60: return max(0.5, 60.0 / words) # Slight penalty for very long return 1.0 def _calculate_score_gap(self, scores: List[float], top_n: int = 3) -> float: """ Calculate the average gap between consecutive scores in the top N. """ if len(scores) < 2: return 0.0 top_n = min(len(scores), top_n) gaps = [] for i in range(top_n - 1): gaps.append(scores[i] - scores[i + 1]) return float(np.mean(gaps)) if gaps else 0.0 # import numpy as np # from typing import List, Tuple, Dict, Any, TYPE_CHECKING # from sklearn.metrics.pairwise import cosine_similarity # from logger_config import config_logger # logger = config_logger(__name__) # if TYPE_CHECKING: # from tf_data_pipeline import TFDataPipeline # class ResponseQualityChecker: # """Enhanced quality checking with dynamic thresholds.""" # def __init__( # self, # data_pipeline: 'TFDataPipeline', # confidence_threshold: float = 0.4, # diversity_threshold: float = 0.15, # min_response_length: int = 5, # similarity_cap: float = 0.85 # Renamed from max_similarity_ratio and used in diversity calc # ): # self.confidence_threshold = confidence_threshold # self.diversity_threshold = diversity_threshold # self.min_response_length = min_response_length # self.similarity_cap = similarity_cap # self.data_pipeline = data_pipeline # Reference to TFDataPipeline # # Dynamic thresholds based on response patterns # self.thresholds = { # 'relevance': 0.35, # 'length_score': 0.85, # 'score_gap': 0.04 # } # def check_response_quality( # self, # query: str, # responses: List[Tuple[str, float]] # ) -> Dict[str, Any]: # """ # Evaluate the quality of responses based on various metrics. # Args: # query: The user's query # responses: List of (response_text, score) tuples # Returns: # Dict containing quality metrics and confidence assessment # """ # if not responses: # return { # 'response_diversity': 0.0, # 'query_response_relevance': 0.0, # 'is_confident': False, # 'top_score': 0.0, # 'response_length_score': 0.0, # 'top_3_score_gap': 0.0 # } # # Calculate core metrics # metrics = { # 'response_diversity': self.calculate_diversity(responses), # 'query_response_relevance': self.calculate_relevance(query, responses), # 'response_length_score': np.mean([ # self._calculate_length_score(response) for response, _ in responses # ]), # 'top_score': responses[0][1], # 'top_3_score_gap': self._calculate_score_gap([score for _, score in responses], top_n=3) # } # # Determine confidence using thresholds # metrics['is_confident'] = self._determine_confidence(metrics) # logger.info(f"Quality metrics: {metrics}") # return metrics # def calculate_relevance(self, query: str, responses: List[Tuple[str, float]]) -> float: # """Calculate relevance with stricter scoring.""" # if not responses: # return 0.0 # query_embedding = self.data_pipeline.encode_query(query) # response_texts = [resp for resp, _ in responses] # response_embeddings = self.data_pipeline.encode_responses(response_texts) # # Normalize embeddings # query_embedding = query_embedding / np.linalg.norm(query_embedding) # response_embeddings = response_embeddings / np.linalg.norm(response_embeddings, axis=1)[:, np.newaxis] # # Compute similarities with exponential decay for far matches # similarities = cosine_similarity([query_embedding], response_embeddings)[0] # similarities = np.exp(similarities - 1) # Penalize lower similarities more strongly # # Apply stronger position weighting # weights = np.exp(-np.arange(len(similarities)) / 2) # return float(np.average(similarities, weights=weights)) # def calculate_diversity(self, responses: List[Tuple[str, float]]) -> float: # """Calculate diversity with length normalization and similarity capping.""" # if not responses: # return 0.0 # response_texts = [resp for resp, _ in responses] # embeddings = self.data_pipeline.encode_responses(response_texts) # if len(embeddings) < 2: # return 1.0 # # Calculate pairwise cosine similarities # similarity_matrix = cosine_similarity(embeddings) # np.fill_diagonal(similarity_matrix, 0) # Exclude self-similarity # # Apply similarity cap # similarity_matrix = np.minimum(similarity_matrix, self.similarity_cap) # # Calculate average similarity # sum_similarities = np.sum(similarity_matrix) # num_pairs = len(embeddings) * (len(embeddings) - 1) # avg_similarity = sum_similarities / num_pairs if num_pairs > 0 else 0.0 # # Diversity is inversely related to average similarity # diversity_score = 1 - avg_similarity # return diversity_score # def _determine_confidence(self, metrics: Dict[str, float]) -> bool: # """Determine confidence using primary and secondary conditions.""" # # Primary conditions (must all be met) # primary_conditions = [ # metrics['top_score'] >= self.confidence_threshold, # metrics['response_diversity'] >= self.diversity_threshold, # metrics['response_length_score'] >= self.thresholds['length_score'] # ] # # Secondary conditions (majority must be met) # secondary_conditions = [ # metrics['query_response_relevance'] >= self.thresholds['relevance'], # metrics['top_3_score_gap'] >= self.thresholds['score_gap'], # metrics['top_score'] >= (self.confidence_threshold * 1.1) # Extra confidence boost # ] # return all(primary_conditions) and sum(secondary_conditions) >= 2 # def _calculate_length_score(self, response: str) -> float: # """Calculate length score with penalty for very short or long responses.""" # words = len(response.split()) # if words < self.min_response_length: # return words / self.min_response_length # elif words > 50: # Penalty for very long responses # return min(1.0, 50 / words) # return 1.0 # def _calculate_score_gap(self, scores: List[float], top_n: int = 3) -> float: # """Calculate average gap between top N scores.""" # if len(scores) < top_n + 1: # return 0.0 # gaps = [scores[i] - scores[i + 1] for i in range(min(len(scores) - 1, top_n))] # return np.mean(gaps)