#!/usr/bin/env python3 """ Test Geometric Mean Method for Multi-Topic Word Finding The geometric mean approach: score = (sim1 × sim2 × ... × simN)^(1/N) This method penalizes low scores more heavily than arithmetic mean, potentially finding better intersection words. """ import os import sys import numpy as np from typing import List, Tuple, Dict import warnings # Suppress warnings for cleaner output warnings.filterwarnings("ignore") def setup_environment(): """Setup environment and imports""" # Set cache directory to root cache-dir folder cache_dir = os.path.join(os.path.dirname(__file__), '..', 'cache-dir') cache_dir = os.path.abspath(cache_dir) # Get absolute path os.environ['HF_HOME'] = cache_dir os.environ['TRANSFORMERS_CACHE'] = cache_dir os.environ['SENTENCE_TRANSFORMERS_HOME'] = cache_dir try: from sentence_transformers import SentenceTransformer import torch return SentenceTransformer, torch except ImportError as e: print(f"❌ Missing dependencies: {e}") print("Install with: pip install sentence-transformers torch") sys.exit(1) def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: """Calculate cosine similarity between two vectors""" return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) def geometric_mean_method(topic_vectors: List[np.ndarray], word_vectors: Dict[str, np.ndarray]) -> List[Tuple[str, float]]: """ Geometric mean method - finds words relevant to ALL topics. Score = (similarity_to_topic1 × similarity_to_topic2 × ...)^(1/N) """ similarities = [] for word, word_vec in word_vectors.items(): # Calculate similarity to each topic topic_similarities = [] for topic_vec in topic_vectors: sim = cosine_similarity(word_vec, topic_vec) # Ensure positive for geometric mean (add small epsilon if needed) sim = max(sim, 0.001) # Avoid zero/negative values topic_similarities.append(sim) # Geometric mean: (a * b * c)^(1/n) geo_mean = np.prod(topic_similarities) ** (1/len(topic_similarities)) similarities.append((word, geo_mean)) return sorted(similarities, key=lambda x: x[1], reverse=True) def harmonic_mean_method(topic_vectors: List[np.ndarray], word_vectors: Dict[str, np.ndarray]) -> List[Tuple[str, float]]: """ Harmonic mean method - heavily penalizes low scores. Score = N / (1/sim1 + 1/sim2 + ... + 1/simN) """ similarities = [] for word, word_vec in word_vectors.items(): # Calculate similarity to each topic topic_similarities = [] for topic_vec in topic_vectors: sim = cosine_similarity(word_vec, topic_vec) # Ensure positive for harmonic mean sim = max(sim, 0.001) topic_similarities.append(sim) # Harmonic mean: N / (1/a + 1/b + 1/c + ...) harmonic_mean = len(topic_similarities) / sum(1/s for s in topic_similarities) similarities.append((word, harmonic_mean)) return sorted(similarities, key=lambda x: x[1], reverse=True) def soft_min_method(topic_vectors: List[np.ndarray], word_vectors: Dict[str, np.ndarray], beta: float = 10.0) -> List[Tuple[str, float]]: """ Soft minimum method - smooth approximation to minimum similarity. Score = -log(sum(exp(-beta * sim_i))) / beta """ similarities = [] for word, word_vec in word_vectors.items(): # Calculate similarity to each topic topic_similarities = [] for topic_vec in topic_vectors: sim = cosine_similarity(word_vec, topic_vec) topic_similarities.append(sim) # Soft minimum using LogSumExp score = -np.log(sum(np.exp(-beta * s) for s in topic_similarities)) / beta similarities.append((word, score)) return sorted(similarities, key=lambda x: x[1], reverse=True) def simple_averaging(topic_vectors: List[np.ndarray], word_vectors: Dict[str, np.ndarray]) -> List[Tuple[str, float]]: """Simple averaging method (current approach)""" avg_vector = np.mean(topic_vectors, axis=0) similarities = [] for word, word_vec in word_vectors.items(): sim = cosine_similarity(avg_vector, word_vec) similarities.append((word, sim)) return sorted(similarities, key=lambda x: x[1], reverse=True) def load_sample_words() -> List[str]: """Load actual sample words from the art-and-books sample file""" sample_file = os.path.join(os.path.dirname(__file__), '..', 'samples', 'art-and-books-sample-words.txt') words = [] current_section = None if os.path.exists(sample_file): with open(sample_file, 'r') as f: for line in f: line = line.strip() if line.startswith("['art', 'books']"): current_section = "separated" continue elif line.startswith("['art and books']") or line.startswith("['words related to art and books']"): current_section = "combined" continue elif line and not line.startswith('[') and line != '' and current_section == "separated": # Only use the separated topics section for comparison words.append(line) if len(words) >= 100: # Limit for performance break return words def test_multiple_methods(model): """Compare all intersection methods""" print("🔍 Comparing Multiple Intersection Methods") print("=" * 70) # Load sample words sample_words = load_sample_words() print(f"Loaded {len(sample_words)} sample words") if len(sample_words) < 10: print("❌ Not enough sample words loaded") return # Get topic embeddings topics = ["Art", "Books"] topic_embeddings = model.encode(topics) topic_vectors = [emb for emb in topic_embeddings] # Get word embeddings print("Encoding word embeddings...") word_embeddings = model.encode(sample_words) word_vectors = dict(zip(sample_words, word_embeddings)) # Test all methods methods = [ ("Simple Averaging", simple_averaging), ("Geometric Mean", geometric_mean_method), ("Harmonic Mean", harmonic_mean_method), ("Soft Minimum", lambda tv, wv: soft_min_method(tv, wv, beta=10.0)) ] all_results = {} for method_name, method_func in methods: print(f"\n📊 {method_name} - Top 15:") results = method_func(topic_vectors, word_vectors) all_results[method_name] = results for i, (word, score) in enumerate(results[:15], 1): print(f" {i:2d}. {word:20s}: {score:.4f}") # Analyze differences print(f"\n🔄 Method Comparison Analysis:") # Find words that rank very differently across methods word_rankings = {} for method_name, results in all_results.items(): rankings = {word: rank for rank, (word, _) in enumerate(results)} word_rankings[method_name] = rankings # Look for significant differences significant_differences = [] for word in sample_words[:50]: # Check top words only rankings = [word_rankings[method].get(word, len(sample_words)) for method in word_rankings] if max(rankings) - min(rankings) >= 10: # Significant rank difference significant_differences.append((word, rankings)) if significant_differences: print(f" Words with significant ranking differences:") method_names = list(all_results.keys()) header = f" {'Word':<20s} " + " ".join(f"{name[:8]:>8s}" for name in method_names) print(header) print(" " + "-" * len(header)) for word, rankings in significant_differences[:10]: rank_str = " ".join(f"{rank+1:8d}" for rank in rankings) print(f" {word:<20s} {rank_str}") else: print(" No significant ranking differences found") # Analyze specific problematic and good words problematic_words = ["ethology", "guns", "porn", "calibre"] good_words = ["illustration", "literature", "painting", "library", "poetry"] print(f"\n🎯 Analysis of Known Problematic Words:") for word in problematic_words: if word in word_rankings["Simple Averaging"]: ranks = [] for method_name in all_results.keys(): rank = word_rankings[method_name].get(word, len(sample_words)) ranks.append(f"{rank+1:3d}") print(f" {word:15s}: " + " | ".join(f"{method[:10]:>10s}: {rank}" for method, rank in zip(all_results.keys(), ranks))) print(f"\n✅ Analysis of Good Intersection Words:") for word in good_words: if word in word_rankings["Simple Averaging"]: ranks = [] for method_name in all_results.keys(): rank = word_rankings[method_name].get(word, len(sample_words)) ranks.append(f"{rank+1:3d}") print(f" {word:15s}: " + " | ".join(f"{method[:10]:>10s}: {rank}" for method, rank in zip(all_results.keys(), ranks))) def test_individual_similarities(model): """Analyze individual topic similarities for key words""" print("\n\n🔬 Individual Topic Similarity Analysis") print("=" * 70) # Test specific words test_words = ["ethology", "illustration", "literature", "guns", "art", "books", "poetry"] topics = ["Art", "Books"] # Get embeddings topic_embeddings = model.encode(topics) word_embeddings = model.encode(test_words) print(f"Individual similarities to each topic:") print(f"{'Word':<15s} {'Art':<8s} {'Books':<8s} {'Geo Mean':<10s} {'Harm Mean':<10s} {'Soft Min':<10s}") print("-" * 70) for word, word_emb in zip(test_words, word_embeddings): art_sim = cosine_similarity(word_emb, topic_embeddings[0]) books_sim = cosine_similarity(word_emb, topic_embeddings[1]) # Calculate different aggregations sims = [art_sim, books_sim] geo_mean = np.prod([max(s, 0.001) for s in sims]) ** (1/len(sims)) harm_mean = len(sims) / sum(1/max(s, 0.001) for s in sims) soft_min = -np.log(sum(np.exp(-10.0 * s) for s in sims)) / 10.0 print(f"{word:<15s} {art_sim:8.4f} {books_sim:8.4f} {geo_mean:10.4f} {harm_mean:10.4f} {soft_min:10.4f}") def main(): """Main test runner""" print("🧪 Geometric Mean and Multiple Methods Test") print("Using production model: sentence-transformers/all-mpnet-base-v2") print("=" * 70) # Setup SentenceTransformer, torch = setup_environment() # Load model model_name = "sentence-transformers/all-mpnet-base-v2" print(f"Loading model: {model_name}") model = SentenceTransformer(model_name) print(f"✅ Model loaded successfully") # Run tests test_multiple_methods(model) test_individual_similarities(model) print("\n" + "=" * 70) print("🎯 KEY INSIGHTS:") print("1. Geometric mean penalizes words with low similarity to any topic") print("2. Harmonic mean is even more aggressive at finding intersections") print("3. Soft minimum provides smooth approximation to true intersection") print("4. All methods may show similar results if topics are semantically close") print("=" * 70) if __name__ == "__main__": main()