|
|
|
""" |
|
Unified Thematic Word Generator using WordFreq + SentenceTransformers |
|
|
|
Eliminates vocabulary redundancy by using WordFreq as the single vocabulary source |
|
for both word lists and frequency data, with all-mpnet-base-v2 for embeddings. |
|
|
|
Features: |
|
- Single vocabulary source (WordFreq 319K words vs previous 3 separate sources) |
|
- Unified filtering for crossword-suitable words |
|
- 10-tier frequency classification system |
|
- Compatible with crossword backend services |
|
- Comprehensive modern vocabulary with proper frequency data |
|
""" |
|
|
|
import os |
|
import csv |
|
import pickle |
|
import numpy as np |
|
import logging |
|
import asyncio |
|
import random |
|
from typing import List, Tuple, Optional, Dict, Set, Any |
|
from sentence_transformers import SentenceTransformer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
from sklearn.cluster import KMeans |
|
from datetime import datetime |
|
import time |
|
from collections import Counter |
|
from pathlib import Path |
|
|
|
|
|
from wordfreq import word_frequency, zipf_frequency, top_n_list |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s', |
|
datefmt='%Y-%m-%d %H:%M:%S' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
def get_timestamp(): |
|
return datetime.now().strftime("%H:%M:%S") |
|
|
|
def get_datetimestamp(): |
|
return datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
class VocabularyManager: |
|
""" |
|
Centralized vocabulary management using WordFreq as the single source. |
|
Handles loading, filtering, caching, and frequency data generation. |
|
""" |
|
|
|
def __init__(self, cache_dir: Optional[str] = None, vocab_size_limit: Optional[int] = None): |
|
"""Initialize vocabulary manager. |
|
|
|
Args: |
|
cache_dir: Directory for caching vocabulary and embeddings |
|
vocab_size_limit: Maximum vocabulary size (None for full WordFreq vocabulary) |
|
""" |
|
if cache_dir is None: |
|
cache_dir = os.path.join(os.path.dirname(__file__), 'model_cache') |
|
|
|
self.cache_dir = Path(cache_dir) |
|
self.cache_dir.mkdir(exist_ok=True) |
|
|
|
|
|
self.vocab_size_limit = vocab_size_limit or int(os.getenv("MAX_VOCABULARY_SIZE", "100000")) |
|
|
|
|
|
self.vocab_cache_path = self.cache_dir / f"unified_vocabulary_{self.vocab_size_limit}.pkl" |
|
self.frequency_cache_path = self.cache_dir / f"unified_frequencies_{self.vocab_size_limit}.pkl" |
|
|
|
|
|
self.vocabulary: List[str] = [] |
|
self.word_frequencies: Counter = Counter() |
|
self.is_loaded = False |
|
|
|
def load_vocabulary(self) -> Tuple[List[str], Counter]: |
|
"""Load vocabulary and frequency data, with caching.""" |
|
if self.is_loaded: |
|
return self.vocabulary, self.word_frequencies |
|
|
|
|
|
if self._load_from_cache(): |
|
logger.info(f"✅ Loaded vocabulary from cache: {len(self.vocabulary):,} words") |
|
self.is_loaded = True |
|
return self.vocabulary, self.word_frequencies |
|
|
|
|
|
logger.info("🔄 Generating vocabulary from WordFreq...") |
|
self._generate_vocabulary_from_wordfreq() |
|
|
|
|
|
self._save_to_cache() |
|
|
|
self.is_loaded = True |
|
return self.vocabulary, self.word_frequencies |
|
|
|
def _load_from_cache(self) -> bool: |
|
"""Load vocabulary and frequencies from cache.""" |
|
try: |
|
if self.vocab_cache_path.exists() and self.frequency_cache_path.exists(): |
|
logger.info("📦 Loading vocabulary from cache...") |
|
|
|
with open(self.vocab_cache_path, 'rb') as f: |
|
self.vocabulary = pickle.load(f) |
|
|
|
with open(self.frequency_cache_path, 'rb') as f: |
|
self.word_frequencies = pickle.load(f) |
|
|
|
return True |
|
except Exception as e: |
|
logger.warning(f"⚠️ Cache loading failed: {e}") |
|
|
|
return False |
|
|
|
def _save_to_cache(self): |
|
"""Save vocabulary and frequencies to cache.""" |
|
try: |
|
logger.info("💾 Saving vocabulary to cache...") |
|
|
|
with open(self.vocab_cache_path, 'wb') as f: |
|
pickle.dump(self.vocabulary, f) |
|
|
|
with open(self.frequency_cache_path, 'wb') as f: |
|
pickle.dump(self.word_frequencies, f) |
|
|
|
logger.info("✅ Vocabulary cached successfully") |
|
except Exception as e: |
|
logger.warning(f"⚠️ Cache saving failed: {e}") |
|
|
|
def _generate_vocabulary_from_wordfreq(self): |
|
"""Generate filtered vocabulary from WordFreq database.""" |
|
logger.info(f"📚 Fetching top {self.vocab_size_limit:,} words from WordFreq...") |
|
|
|
|
|
raw_words = top_n_list('en', self.vocab_size_limit * 2, wordlist='large') |
|
logger.info(f"📥 Retrieved {len(raw_words):,} raw words from WordFreq") |
|
|
|
|
|
filtered_words = [] |
|
frequency_data = Counter() |
|
|
|
logger.info("🔍 Applying crossword filtering...") |
|
for word in raw_words: |
|
if self._is_crossword_suitable(word): |
|
filtered_words.append(word.lower()) |
|
|
|
|
|
try: |
|
freq = word_frequency(word, 'en', wordlist='large') |
|
if freq > 0: |
|
|
|
frequency_data[word.lower()] = int(freq * 1e9) |
|
except: |
|
frequency_data[word.lower()] = 1 |
|
|
|
if len(filtered_words) >= self.vocab_size_limit: |
|
break |
|
|
|
|
|
self.vocabulary = sorted(list(set(filtered_words))) |
|
self.word_frequencies = frequency_data |
|
|
|
logger.info(f"✅ Generated filtered vocabulary: {len(self.vocabulary):,} words") |
|
logger.info(f"📊 Frequency data coverage: {len(self.word_frequencies):,} words") |
|
|
|
def _is_crossword_suitable(self, word: str) -> bool: |
|
"""Check if word is suitable for crosswords.""" |
|
word = word.lower().strip() |
|
|
|
|
|
if len(word) < 3 or len(word) > 12: |
|
return False |
|
|
|
|
|
if not word.isalpha(): |
|
return False |
|
|
|
|
|
boring_words = { |
|
'the', 'and', 'for', 'are', 'but', 'not', 'you', 'all', 'this', 'that', |
|
'with', 'from', 'they', 'were', 'been', 'have', 'their', 'said', 'each', |
|
'which', 'what', 'there', 'will', 'more', 'when', 'some', 'like', 'into', |
|
'time', 'very', 'only', 'has', 'had', 'who', 'its', 'now', 'find', 'long', |
|
'down', 'day', 'did', 'get', 'come', 'made', 'may', 'part' |
|
} |
|
|
|
if word in boring_words: |
|
return False |
|
|
|
|
|
if len(word) > 4 and word.endswith('s') and not word.endswith(('ss', 'us', 'is')): |
|
return False |
|
|
|
|
|
if len(set(word)) < len(word) * 0.6: |
|
return False |
|
|
|
return True |
|
|
|
|
|
class UnifiedThematicWordGenerator: |
|
""" |
|
Unified thematic word generator using WordFreq vocabulary and all-mpnet-base-v2 embeddings. |
|
|
|
Compatible with both hack tools and crossword backend services. |
|
Eliminates vocabulary redundancy by using single source for everything. |
|
""" |
|
|
|
def __init__(self, cache_dir: Optional[str] = None, model_name: str = 'all-mpnet-base-v2', |
|
vocab_size_limit: Optional[int] = None): |
|
"""Initialize the unified thematic word generator. |
|
|
|
Args: |
|
cache_dir: Directory to cache model and embeddings |
|
model_name: Sentence transformer model to use |
|
vocab_size_limit: Maximum vocabulary size (None for 100K default) |
|
""" |
|
if cache_dir is None: |
|
cache_dir = os.path.join(os.path.dirname(__file__), 'model_cache') |
|
|
|
self.cache_dir = Path(cache_dir) |
|
self.cache_dir.mkdir(exist_ok=True) |
|
|
|
self.model_name = model_name |
|
self.vocab_size_limit = vocab_size_limit |
|
|
|
|
|
self.similarity_temperature = float(os.getenv("SIMILARITY_TEMPERATURE", "0.7")) |
|
self.use_softmax_selection = os.getenv("USE_SOFTMAX_SELECTION", "true").lower() == "true" |
|
self.difficulty_weight = float(os.getenv("DIFFICULTY_WEIGHT", "0.3")) |
|
|
|
|
|
self.vocab_manager = VocabularyManager(cache_dir, vocab_size_limit) |
|
self.model: Optional[SentenceTransformer] = None |
|
|
|
|
|
self.vocabulary: List[str] = [] |
|
self.word_frequencies: Counter = Counter() |
|
self.vocab_embeddings: Optional[np.ndarray] = None |
|
self.frequency_tiers: Dict[str, str] = {} |
|
self.tier_descriptions: Dict[str, str] = {} |
|
self.word_percentiles: Dict[str, float] = {} |
|
|
|
|
|
vocab_hash = f"{model_name}_{vocab_size_limit or 100000}" |
|
self.embeddings_cache_path = self.cache_dir / f"unified_embeddings_{vocab_hash}.npy" |
|
|
|
self.is_initialized = False |
|
|
|
def initialize(self): |
|
"""Initialize the generator (synchronous version).""" |
|
if self.is_initialized: |
|
return |
|
|
|
start_time = time.time() |
|
logger.info(f"🚀 Initializing Unified Thematic Word Generator...") |
|
|
|
|
|
self.vocabulary, self.word_frequencies = self.vocab_manager.load_vocabulary() |
|
|
|
|
|
self.frequency_tiers = self._create_frequency_tiers() |
|
|
|
|
|
logger.info(f"🤖 Loading embedding model: {self.model_name}") |
|
model_start = time.time() |
|
self.model = SentenceTransformer( |
|
f'sentence-transformers/{self.model_name}', |
|
cache_folder=str(self.cache_dir) |
|
) |
|
model_time = time.time() - model_start |
|
logger.info(f"✅ Model loaded in {model_time:.2f}s") |
|
|
|
|
|
self.vocab_embeddings = self._load_or_create_embeddings() |
|
|
|
self.is_initialized = True |
|
total_time = time.time() - start_time |
|
logger.info(f"🎉 Unified generator initialized in {total_time:.2f}s") |
|
logger.info(f"📊 Vocabulary: {len(self.vocabulary):,} words") |
|
logger.info(f"📈 Frequency data: {len(self.word_frequencies):,} words") |
|
logger.info(f"🎲 Softmax selection: {'ENABLED' if self.use_softmax_selection else 'DISABLED'}") |
|
if self.use_softmax_selection: |
|
logger.info(f"🌡️ Similarity temperature: {self.similarity_temperature}") |
|
|
|
async def initialize_async(self): |
|
"""Initialize the generator (async version for backend compatibility).""" |
|
return self.initialize() |
|
|
|
def _load_or_create_embeddings(self) -> np.ndarray: |
|
"""Load embeddings from cache or create them.""" |
|
|
|
if self.embeddings_cache_path.exists(): |
|
try: |
|
logger.info("📦 Loading embeddings from cache...") |
|
embeddings = np.load(self.embeddings_cache_path) |
|
logger.info(f"✅ Loaded embeddings: {embeddings.shape}") |
|
return embeddings |
|
except Exception as e: |
|
logger.warning(f"⚠️ Embeddings cache loading failed: {e}") |
|
|
|
|
|
logger.info("🔄 Creating embeddings for vocabulary...") |
|
start_time = time.time() |
|
|
|
|
|
batch_size = 512 |
|
all_embeddings = [] |
|
|
|
for i in range(0, len(self.vocabulary), batch_size): |
|
batch_words = self.vocabulary[i:i + batch_size] |
|
batch_embeddings = self.model.encode( |
|
batch_words, |
|
convert_to_tensor=False, |
|
show_progress_bar=i == 0 |
|
) |
|
all_embeddings.append(batch_embeddings) |
|
|
|
if i % (batch_size * 10) == 0: |
|
logger.info(f"📊 Embeddings progress: {i:,}/{len(self.vocabulary):,}") |
|
|
|
embeddings = np.vstack(all_embeddings) |
|
embedding_time = time.time() - start_time |
|
logger.info(f"✅ Created embeddings in {embedding_time:.2f}s: {embeddings.shape}") |
|
|
|
|
|
try: |
|
np.save(self.embeddings_cache_path, embeddings) |
|
logger.info("💾 Embeddings cached successfully") |
|
except Exception as e: |
|
logger.warning(f"⚠️ Embeddings cache saving failed: {e}") |
|
|
|
return embeddings |
|
|
|
def _create_frequency_tiers(self) -> Dict[str, str]: |
|
"""Create 10-tier frequency classification system and calculate word percentiles.""" |
|
if not self.word_frequencies: |
|
return {} |
|
|
|
logger.info("📊 Creating frequency tiers and percentiles...") |
|
|
|
tiers = {} |
|
percentiles = {} |
|
|
|
|
|
all_counts = list(self.word_frequencies.values()) |
|
all_counts.sort(reverse=True) |
|
|
|
|
|
|
|
count_to_rank = {} |
|
for rank, count in enumerate(all_counts): |
|
if count not in count_to_rank: |
|
count_to_rank[count] = rank |
|
|
|
|
|
tier_definitions = [ |
|
("tier_1_ultra_common", 0.999, "Ultra Common (Top 0.1%)"), |
|
("tier_2_extremely_common", 0.995, "Extremely Common (Top 0.5%)"), |
|
("tier_3_very_common", 0.99, "Very Common (Top 1%)"), |
|
("tier_4_highly_common", 0.97, "Highly Common (Top 3%)"), |
|
("tier_5_common", 0.92, "Common (Top 8%)"), |
|
("tier_6_moderately_common", 0.85, "Moderately Common (Top 15%)"), |
|
("tier_7_somewhat_uncommon", 0.70, "Somewhat Uncommon (Top 30%)"), |
|
("tier_8_uncommon", 0.50, "Uncommon (Top 50%)"), |
|
("tier_9_rare", 0.25, "Rare (Top 75%)"), |
|
("tier_10_very_rare", 0.0, "Very Rare (Bottom 25%)") |
|
] |
|
|
|
|
|
thresholds = [] |
|
for tier_name, percentile, description in tier_definitions: |
|
if percentile > 0: |
|
idx = int((1 - percentile) * len(all_counts)) |
|
threshold = all_counts[min(idx, len(all_counts) - 1)] |
|
else: |
|
threshold = 0 |
|
thresholds.append((tier_name, threshold, description)) |
|
|
|
|
|
self.tier_descriptions = {name: desc for name, _, desc in thresholds} |
|
|
|
|
|
for word, count in self.word_frequencies.items(): |
|
|
|
rank = count_to_rank.get(count, len(all_counts) - 1) |
|
percentile = 1.0 - (rank / len(all_counts)) |
|
percentiles[word] = percentile |
|
|
|
|
|
assigned = False |
|
for tier_name, threshold, description in thresholds: |
|
if count >= threshold: |
|
tiers[word] = tier_name |
|
assigned = True |
|
break |
|
|
|
if not assigned: |
|
tiers[word] = "tier_10_very_rare" |
|
|
|
|
|
for word in self.vocabulary: |
|
if word not in tiers: |
|
tiers[word] = "tier_10_very_rare" |
|
percentiles[word] = 0.0 |
|
|
|
|
|
self.word_percentiles = percentiles |
|
|
|
|
|
tier_counts = Counter(tiers.values()) |
|
logger.info(f"✅ Created frequency tiers:") |
|
for tier_name, count in sorted(tier_counts.items()): |
|
desc = self.tier_descriptions.get(tier_name, tier_name) |
|
logger.info(f" {desc}: {count:,} words") |
|
|
|
|
|
percentile_values = list(percentiles.values()) |
|
if percentile_values: |
|
avg_percentile = np.mean(percentile_values) |
|
logger.info(f"📈 Percentile statistics: avg={avg_percentile:.3f}, range=0.000-1.000") |
|
|
|
return tiers |
|
|
|
def generate_thematic_words(self, |
|
inputs, |
|
num_words: int = 20, |
|
min_similarity: float = 0.3, |
|
multi_theme: bool = False, |
|
difficulty: str = "medium") -> List[Tuple[str, float, str]]: |
|
"""Generate thematically related words from input seeds. |
|
|
|
Args: |
|
inputs: Single string, or list of words/sentences as theme seeds |
|
num_words: Number of words to return |
|
min_similarity: Minimum similarity threshold |
|
multi_theme: Whether to detect and use multiple themes |
|
difficulty: Difficulty level ("easy", "medium", "hard") for frequency-aware selection |
|
|
|
Returns: |
|
List of (word, similarity_score, frequency_tier) tuples |
|
""" |
|
if not self.is_initialized: |
|
self.initialize() |
|
|
|
logger.info(f"🎯 Generating {num_words} thematic words") |
|
|
|
|
|
if isinstance(inputs, str): |
|
inputs = [inputs] |
|
|
|
if not inputs: |
|
return [] |
|
|
|
|
|
clean_inputs = [inp.strip().lower() for inp in inputs if inp.strip()] |
|
if not clean_inputs: |
|
return [] |
|
|
|
logger.info(f"📝 Input themes: {clean_inputs}") |
|
logger.info(f"📊 Difficulty level: {difficulty} (using frequency-aware selection)") |
|
|
|
|
|
|
|
auto_multi_theme = len(clean_inputs) > 2 |
|
final_multi_theme = multi_theme or auto_multi_theme |
|
|
|
logger.info(f"🔍 Multi-theme detection: {final_multi_theme} (auto: {auto_multi_theme}, manual: {multi_theme})") |
|
|
|
if final_multi_theme: |
|
theme_vectors = self._detect_multiple_themes(clean_inputs) |
|
logger.info(f"📊 Detected {len(theme_vectors)} themes") |
|
else: |
|
theme_vectors = [self._compute_theme_vector(clean_inputs)] |
|
logger.info("📊 Using single theme vector") |
|
|
|
|
|
all_similarities = np.zeros(len(self.vocabulary)) |
|
|
|
for theme_vector in theme_vectors: |
|
|
|
similarities = cosine_similarity(theme_vector, self.vocab_embeddings)[0] |
|
all_similarities += similarities / len(theme_vectors) |
|
|
|
logger.info("✅ Computed semantic similarities") |
|
|
|
|
|
top_indices = np.argsort(all_similarities)[::-1] |
|
|
|
|
|
results = [] |
|
input_words_set = set(clean_inputs) |
|
|
|
for idx in top_indices: |
|
if len(results) >= num_words * 3: |
|
break |
|
|
|
similarity_score = all_similarities[idx] |
|
word = self.vocabulary[idx] |
|
|
|
|
|
if similarity_score < min_similarity: |
|
continue |
|
|
|
|
|
if word.lower() in input_words_set: |
|
continue |
|
|
|
word_tier = self.frequency_tiers.get(word, "tier_10_very_rare") |
|
|
|
results.append((word, similarity_score, word_tier)) |
|
|
|
|
|
if self.use_softmax_selection and len(results) > num_words: |
|
logger.info(f"🎲 Using difficulty-aware softmax selection (temperature: {self.similarity_temperature})") |
|
final_results = self._softmax_weighted_selection(results, num_words, difficulty=difficulty) |
|
|
|
final_results.sort(key=lambda x: x[1], reverse=True) |
|
else: |
|
logger.info("📊 Using traditional similarity-based sorting") |
|
|
|
results.sort(key=lambda x: x[1], reverse=True) |
|
final_results = results[:num_words] |
|
|
|
logger.info(f"✅ Generated {len(final_results)} thematic words") |
|
return final_results |
|
|
|
def _compute_theme_vector(self, inputs: List[str]) -> np.ndarray: |
|
"""Compute semantic centroid from input words/sentences.""" |
|
logger.info(f"🎯 Computing theme vector for {len(inputs)} inputs") |
|
|
|
|
|
input_embeddings = self.model.encode(inputs, convert_to_tensor=False, show_progress_bar=False) |
|
logger.info(f"✅ Encoded {len(inputs)} inputs") |
|
|
|
|
|
theme_vector = np.mean(input_embeddings, axis=0) |
|
|
|
return theme_vector.reshape(1, -1) |
|
|
|
def _compute_composite_score(self, similarity: float, word: str, difficulty: str = "medium") -> float: |
|
""" |
|
Combine semantic similarity with frequency-based difficulty alignment using ML feature engineering. |
|
|
|
This is the core of the difficulty-aware selection system. It creates a composite score |
|
that balances two key factors: |
|
1. Semantic Relevance: How well the word matches the theme (similarity score) |
|
2. Difficulty Alignment: How well the word's frequency matches the desired difficulty |
|
|
|
Frequency Alignment uses Gaussian distributions to create smooth preference curves: |
|
|
|
Easy Mode (targets common words): |
|
- Gaussian peak at 90th percentile with narrow width (σ=0.1) |
|
- Words like CAT (95th percentile) get high scores |
|
- Words like QUETZAL (15th percentile) get low scores |
|
- Formula: exp(-((percentile - 0.9)² / (2 * 0.1²))) |
|
|
|
Hard Mode (targets rare words): |
|
- Gaussian peak at 20th percentile with moderate width (σ=0.15) |
|
- Words like QUETZAL (15th percentile) get high scores |
|
- Words like CAT (95th percentile) get low scores |
|
- Formula: exp(-((percentile - 0.2)² / (2 * 0.15²))) |
|
|
|
Medium Mode (balanced): |
|
- Flatter distribution with slight peak at 50th percentile (σ=0.3) |
|
- Base score of 0.5 plus Gaussian bonus |
|
- Less extreme preference, more balanced selection |
|
- Formula: 0.5 + 0.5 * exp(-((percentile - 0.5)² / (2 * 0.3²))) |
|
|
|
Final Weighting: |
|
composite = (1 - difficulty_weight) * similarity + difficulty_weight * frequency_alignment |
|
|
|
Where difficulty_weight (default 0.3) controls the balance: |
|
- Higher weight = more frequency influence, less similarity influence |
|
- Lower weight = more similarity influence, less frequency influence |
|
|
|
Example Calculations: |
|
Theme: "animals", difficulty_weight=0.3 |
|
|
|
Easy mode: |
|
- CAT: similarity=0.8, percentile=0.95 → freq_score=0.61 → composite=0.74 |
|
- PLATYPUS: similarity=0.9, percentile=0.15 → freq_score=0.01 → composite=0.63 |
|
- Result: CAT wins despite lower similarity (common word bonus) |
|
|
|
Hard mode: |
|
- CAT: similarity=0.8, percentile=0.95 → freq_score=0.01 → composite=0.32 |
|
- PLATYPUS: similarity=0.9, percentile=0.15 → freq_score=0.94 → composite=0.64 |
|
- Result: PLATYPUS wins due to rarity bonus |
|
|
|
Args: |
|
similarity: Semantic similarity score (0-1) from sentence transformer |
|
word: The word to get percentile for |
|
difficulty: "easy", "medium", or "hard" - determines frequency preference curve |
|
|
|
Returns: |
|
Composite score (0-1) combining semantic relevance and difficulty alignment |
|
""" |
|
|
|
percentile = self.word_percentiles.get(word.lower(), 0.0) |
|
|
|
|
|
if difficulty == "easy": |
|
|
|
freq_score = np.exp(-((percentile - 0.9) ** 2) / (2 * 0.1 ** 2)) |
|
elif difficulty == "hard": |
|
|
|
freq_score = np.exp(-((percentile - 0.2) ** 2) / (2 * 0.15 ** 2)) |
|
else: |
|
|
|
freq_score = 0.5 + 0.5 * np.exp(-((percentile - 0.5) ** 2) / (2 * 0.3 ** 2)) |
|
|
|
|
|
final_alpha = 1.0 - self.difficulty_weight |
|
final_beta = self.difficulty_weight |
|
|
|
composite = final_alpha * similarity + final_beta * freq_score |
|
return composite |
|
|
|
def _softmax_with_temperature(self, scores: np.ndarray, temperature: float = 1.0) -> np.ndarray: |
|
""" |
|
Apply softmax with temperature control to similarity scores. |
|
|
|
Args: |
|
scores: Array of similarity scores |
|
temperature: Temperature parameter (lower = more deterministic, higher = more random) |
|
- temperature < 1.0: More deterministic (favor high similarity) |
|
- temperature = 1.0: Standard softmax |
|
- temperature > 1.0: More random (flatten differences) |
|
|
|
Returns: |
|
Probability distribution over the scores |
|
""" |
|
if temperature <= 0: |
|
temperature = 0.01 |
|
|
|
|
|
scaled_scores = scores / temperature |
|
|
|
|
|
max_score = np.max(scaled_scores) |
|
exp_scores = np.exp(scaled_scores - max_score) |
|
probabilities = exp_scores / np.sum(exp_scores) |
|
|
|
return probabilities |
|
|
|
def _softmax_weighted_selection(self, candidates: List[Tuple[str, float, str]], |
|
num_words: int, temperature: float = None, difficulty: str = "medium") -> List[Tuple[str, float, str]]: |
|
""" |
|
Select words using softmax-based probabilistic sampling weighted by composite scores. |
|
|
|
This function implements a machine learning approach to word selection that combines: |
|
1. Semantic similarity (how relevant the word is to the theme) |
|
2. Frequency percentiles (how common/rare the word is) |
|
3. Difficulty preference (which frequencies are preferred for easy/medium/hard) |
|
4. Temperature-controlled randomness (exploration vs exploitation balance) |
|
|
|
Temperature Effects: |
|
- temperature < 1.0: More deterministic selection, strongly favors highest composite scores |
|
- temperature = 1.0: Standard softmax probability distribution |
|
- temperature > 1.0: More random selection, flattens differences between scores |
|
- Default 0.7: Balanced between determinism and exploration |
|
|
|
Difficulty Effects (via composite scoring): |
|
- "easy": Gaussian peak at 90th percentile (favors common words like CAT, DOG) |
|
- "medium": Balanced distribution around 50th percentile (moderate preference) |
|
- "hard": Gaussian peak at 20th percentile (favors rare words like QUETZAL, PLATYPUS) |
|
|
|
Composite Score Formula: |
|
composite = (1 - difficulty_weight) * similarity + difficulty_weight * frequency_alignment |
|
|
|
Where frequency_alignment uses Gaussian curves to score how well a word's |
|
percentile matches the difficulty preference. |
|
|
|
Example Scenario: |
|
Theme: "animals", Easy difficulty, Temperature: 0.7 |
|
- CAT: similarity=0.8, percentile=0.95 → high composite score (common + relevant) |
|
- PLATYPUS: similarity=0.9, percentile=0.15 → lower composite (rare word penalized in easy mode) |
|
- Result: CAT more likely to be selected despite lower similarity |
|
|
|
Args: |
|
candidates: List of (word, similarity_score, tier) tuples |
|
num_words: Number of words to select |
|
temperature: Temperature for softmax (None to use instance default of 0.7) |
|
difficulty: Difficulty level ("easy", "medium", "hard") for frequency weighting |
|
|
|
Returns: |
|
Selected words with original similarity scores and tiers, |
|
sampled without replacement according to composite probabilities |
|
""" |
|
if len(candidates) <= num_words: |
|
return candidates |
|
|
|
if temperature is None: |
|
temperature = self.similarity_temperature |
|
|
|
|
|
composite_scores = [] |
|
for word, similarity_score, tier in candidates: |
|
composite = self._compute_composite_score(similarity_score, word, difficulty) |
|
composite_scores.append(composite) |
|
|
|
composite_scores = np.array(composite_scores) |
|
|
|
|
|
probabilities = self._softmax_with_temperature(composite_scores, temperature) |
|
|
|
|
|
selected_indices = np.random.choice( |
|
len(candidates), |
|
size=min(num_words, len(candidates)), |
|
replace=False, |
|
p=probabilities |
|
) |
|
|
|
|
|
selected_candidates = [candidates[i] for i in selected_indices] |
|
|
|
logger.info(f"🎲 Composite softmax selection (T={temperature:.2f}, difficulty={difficulty}): {len(selected_candidates)} from {len(candidates)} candidates") |
|
|
|
return selected_candidates |
|
|
|
def _detect_multiple_themes(self, inputs: List[str], max_themes: int = 3) -> List[np.ndarray]: |
|
"""Detect multiple themes using clustering.""" |
|
if len(inputs) < 2: |
|
return [self._compute_theme_vector(inputs)] |
|
|
|
logger.info(f"🔍 Detecting multiple themes from {len(inputs)} inputs") |
|
|
|
|
|
input_embeddings = self.model.encode(inputs, convert_to_tensor=False, show_progress_bar=False) |
|
logger.info("✅ Encoded inputs for clustering") |
|
|
|
|
|
n_clusters = min(max_themes, len(inputs), 3) |
|
logger.info(f"📊 Using {n_clusters} clusters for theme detection") |
|
|
|
if n_clusters == 1: |
|
return [np.mean(input_embeddings, axis=0).reshape(1, -1)] |
|
|
|
|
|
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) |
|
kmeans.fit(input_embeddings) |
|
|
|
logger.info(f"✅ Clustered inputs into {n_clusters} themes") |
|
|
|
|
|
return [center.reshape(1, -1) for center in kmeans.cluster_centers_] |
|
|
|
def get_tier_words(self, tier: str, limit: int = 1000) -> List[str]: |
|
"""Get all words from a specific frequency tier. |
|
|
|
Args: |
|
tier: Frequency tier name (e.g., "tier_5_common") |
|
limit: Maximum number of words to return |
|
|
|
Returns: |
|
List of words in the specified tier |
|
""" |
|
if not self.is_initialized: |
|
self.initialize() |
|
|
|
tier_words = [word for word, word_tier in self.frequency_tiers.items() |
|
if word_tier == tier] |
|
|
|
return tier_words[:limit] |
|
|
|
def get_word_info(self, word: str) -> Dict[str, Any]: |
|
"""Get comprehensive information about a word. |
|
|
|
Args: |
|
word: Word to get information for |
|
|
|
Returns: |
|
Dictionary with word info including frequency, tier, etc. |
|
""" |
|
if not self.is_initialized: |
|
self.initialize() |
|
|
|
word_lower = word.lower() |
|
|
|
info = { |
|
'word': word, |
|
'in_vocabulary': word_lower in self.vocabulary, |
|
'frequency': self.word_frequencies.get(word_lower, 0), |
|
'tier': self.frequency_tiers.get(word_lower, "tier_10_very_rare"), |
|
'tier_description': self.tier_descriptions.get( |
|
self.frequency_tiers.get(word_lower, "tier_10_very_rare"), |
|
"Unknown" |
|
) |
|
} |
|
|
|
return info |
|
|
|
|
|
async def find_similar_words(self, topic: str, difficulty: str = "medium", max_words: int = 15) -> List[Dict[str, Any]]: |
|
"""Backend-compatible method for finding similar words. |
|
|
|
Returns list of word dictionaries compatible with crossword_generator.py |
|
Expected format: [{"word": str, "clue": str}, ...] |
|
""" |
|
|
|
difficulty_tier_map = { |
|
"easy": [ "tier_2_extremely_common", "tier_3_very_common", "tier_4_highly_common"], |
|
"medium": ["tier_4_highly_common", "tier_5_common", "tier_6_moderately_common", "tier_7_somewhat_uncommon"], |
|
"hard": ["tier_7_somewhat_uncommon", "tier_8_uncommon", "tier_9_rare"] |
|
} |
|
|
|
allowed_tiers = difficulty_tier_map.get(difficulty, difficulty_tier_map["medium"]) |
|
|
|
|
|
all_results = self.generate_thematic_words( |
|
topic, |
|
num_words=max_words * 2, |
|
min_similarity=0.3 |
|
) |
|
|
|
|
|
backend_words = [] |
|
for word, similarity, tier in all_results: |
|
|
|
if not self._matches_backend_difficulty(word, difficulty): |
|
continue |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
backend_word = { |
|
"word": word.upper(), |
|
"clue": self._generate_simple_clue(word, topic), |
|
"similarity": similarity, |
|
"tier": tier |
|
} |
|
|
|
backend_words.append(backend_word) |
|
|
|
if len(backend_words) >= max_words: |
|
break |
|
|
|
logger.info(f"🎯 Generated {len(backend_words)} words for topic '{topic}' (difficulty: {difficulty})") |
|
return backend_words |
|
|
|
def _matches_backend_difficulty(self, word: str, difficulty: str) -> bool: |
|
"""Check if word matches backend difficulty criteria.""" |
|
difficulty_map = { |
|
"easy": {"min_len": 3, "max_len": 8}, |
|
"medium": {"min_len": 4, "max_len": 10}, |
|
"hard": {"min_len": 5, "max_len": 15} |
|
} |
|
|
|
criteria = difficulty_map.get(difficulty, difficulty_map["medium"]) |
|
return criteria["min_len"] <= len(word) <= criteria["max_len"] |
|
|
|
def _generate_simple_clue(self, word: str, topic: str) -> str: |
|
"""Generate a simple clue for the word (backend compatibility).""" |
|
|
|
word_lower = word.lower() |
|
topic_lower = topic.lower() |
|
|
|
|
|
if "animal" in topic_lower: |
|
return f"{word_lower} (animal)" |
|
elif "tech" in topic_lower or "computer" in topic_lower: |
|
return f"{word_lower} (technology)" |
|
elif "science" in topic_lower: |
|
return f"{word_lower} (science)" |
|
elif "geo" in topic_lower or "place" in topic_lower: |
|
return f"{word_lower} (geography)" |
|
elif "food" in topic_lower: |
|
return f"{word_lower} (food)" |
|
else: |
|
return f"{word_lower} (related to {topic_lower})" |
|
|
|
def get_vocabulary_size(self) -> int: |
|
"""Get the size of the loaded vocabulary.""" |
|
return len(self.vocabulary) |
|
|
|
def get_tier_distribution(self) -> Dict[str, int]: |
|
"""Get distribution of words across frequency tiers.""" |
|
if not self.frequency_tiers: |
|
return {} |
|
|
|
tier_counts = Counter(self.frequency_tiers.values()) |
|
return dict(tier_counts) |
|
|
|
|
|
|
|
ThematicWordGenerator = UnifiedThematicWordGenerator |
|
|
|
def main(): |
|
"""Demo the unified thematic word generator.""" |
|
print("🚀 Unified Thematic Word Generator Demo") |
|
print("=" * 60) |
|
|
|
|
|
print("🔄 Initializing generator (this may take a moment)...") |
|
generator = UnifiedThematicWordGenerator(vocab_size_limit=50000) |
|
generator.initialize() |
|
|
|
|
|
test_topics = ["cat", "science", "computer", "ocean", "music"] |
|
|
|
print("\n📊 Vocabulary Statistics:") |
|
print(f"Total vocabulary: {generator.get_vocabulary_size():,} words") |
|
print(f"Tier distribution: {generator.get_tier_distribution()}") |
|
|
|
print("\n🎯 Thematic Word Generation:") |
|
print("=" * 60) |
|
|
|
for topic in test_topics: |
|
print(f"\nTopic: '{topic}'") |
|
print("-" * 30) |
|
|
|
|
|
results = generator.generate_thematic_words(topic, num_words=8) |
|
|
|
if results: |
|
for word, similarity, tier in results: |
|
tier_desc = generator.tier_descriptions.get(tier, tier) |
|
print(f" {word:<15} (sim: {similarity:.3f}, {tier_desc})") |
|
else: |
|
print(" No results found.") |
|
|
|
print("\n🎯 Tier-Specific Generation:") |
|
print("=" * 60) |
|
|
|
|
|
tier_results = generator.generate_thematic_words( |
|
"animal", |
|
num_words=5, |
|
difficulty_tier="tier_5_common" |
|
) |
|
|
|
print(f"\nCommon animal words:") |
|
for word, similarity, tier in tier_results: |
|
print(f" {word:<15} (similarity: {similarity:.3f})") |
|
|
|
|
|
print("\n" + "=" * 60) |
|
print("🎮 INTERACTIVE MODE") |
|
print("=" * 60) |
|
print("Commands:") |
|
print(" <topic> - Generate words for single topic") |
|
print(" <input1>, <input2>, <input3> - Generate words for multiple topics (comma-separated)") |
|
print(" \"<sentence>\" - Generate words from sentence theme") |
|
print(" <input> <num_words> - Generate specific number of words") |
|
print(" <input> tier <tier_name> - Generate words from specific tier") |
|
print(" <input> difficulty <level> - Generate words by difficulty (easy/medium/hard)") |
|
print(" <input> multi - Force multi-theme detection") |
|
print(" info <word> - Get word information") |
|
print(" tiers - Show all available tiers") |
|
print(" stats - Show vocabulary statistics") |
|
print(" help - Show this help") |
|
print(" quit - Exit") |
|
print() |
|
print("Examples:") |
|
print(" I love animals # Single sentence theme") |
|
print(" cats, dogs, pets # Multiple topics (auto multi-theme)") |
|
print(" \"I love you, moonpie, chocolate\" # Mixed: sentence + words") |
|
print(" science, technology 15 # 15 words from multiple topics") |
|
print(" animal tier tier_5_common # Single topic, specific tier") |
|
print() |
|
print("Note: Multi-theme is automatically enabled for 3+ inputs") |
|
print() |
|
|
|
while True: |
|
try: |
|
user_input = input("🎯 Enter command: ").strip() |
|
|
|
if user_input.lower() in ['quit', 'exit', 'q']: |
|
break |
|
|
|
if not user_input: |
|
continue |
|
|
|
parts = user_input.split() |
|
|
|
if user_input.lower() == 'help': |
|
print("\nCommands:") |
|
print(" <topic> - Generate words for single topic") |
|
print(" <input1>, <input2>, <input3> - Generate words for multiple topics (comma-separated)") |
|
print(" \"<sentence>\" - Generate words from sentence theme") |
|
print(" <input> <num_words> - Generate specific number of words") |
|
print(" <input> tier <tier_name> - Generate words from specific tier") |
|
print(" <input> difficulty <level> - Generate words by difficulty (easy/medium/hard)") |
|
print(" <input> multi - Force multi-theme detection") |
|
print(" info <word> - Get word information") |
|
print(" tiers - Show all available tiers") |
|
print(" stats - Show vocabulary statistics") |
|
print(" help - Show this help") |
|
print(" quit - Exit") |
|
print() |
|
print("Examples:") |
|
print(" I love animals # Single sentence theme") |
|
print(" cats, dogs, pets # Multiple topics (auto multi-theme)") |
|
print(" \"I love you, moonpie, chocolate\" # Mixed: sentence + words") |
|
print(" science, technology 15 # 15 words from multiple topics") |
|
print(" animal tier tier_5_common # Single topic, specific tier") |
|
print() |
|
print("Note: Multi-theme is automatically enabled for 3+ inputs") |
|
continue |
|
|
|
elif user_input.lower() == 'stats': |
|
print(f"\n📊 Vocabulary Statistics:") |
|
print(f" Total words: {generator.get_vocabulary_size():,}") |
|
tier_dist = generator.get_tier_distribution() |
|
print(f" Tier distribution:") |
|
for tier, count in sorted(tier_dist.items()): |
|
tier_desc = generator.tier_descriptions.get(tier, tier) |
|
print(f" {tier_desc}: {count:,}") |
|
continue |
|
|
|
elif user_input.lower() == 'tiers': |
|
print(f"\n🎯 Available Frequency Tiers:") |
|
for tier_name, description in sorted(generator.tier_descriptions.items()): |
|
count = generator.get_tier_distribution().get(tier_name, 0) |
|
print(f" {tier_name}: {description} ({count:,} words)") |
|
continue |
|
|
|
elif parts[0].lower() == 'info' and len(parts) > 1: |
|
word = parts[1] |
|
info = generator.get_word_info(word) |
|
print(f"\n📝 Word Information: '{word}'") |
|
print(f" In vocabulary: {info['in_vocabulary']}") |
|
print(f" Frequency: {info['frequency']:,}") |
|
print(f" Tier: {info['tier']}") |
|
print(f" Description: {info['tier_description']}") |
|
continue |
|
|
|
|
|
|
|
if user_input.startswith('"') and '"' in user_input[1:]: |
|
|
|
quote_end = user_input.index('"', 1) |
|
quoted_content = user_input[1:quote_end] |
|
remaining = user_input[quote_end + 1:].strip() |
|
|
|
|
|
if ',' in quoted_content: |
|
|
|
inputs = [item.strip() for item in quoted_content.split(',') if item.strip()] |
|
else: |
|
|
|
inputs = [quoted_content] |
|
|
|
|
|
remaining_parts = remaining.split() if remaining else [] |
|
else: |
|
|
|
|
|
param_keywords = ['tier', 'difficulty', 'multi'] |
|
input_end = len(parts) |
|
|
|
for i, part in enumerate(parts): |
|
if part.lower() in param_keywords or part.isdigit(): |
|
input_end = i |
|
break |
|
|
|
|
|
input_text = ' '.join(parts[:input_end]) |
|
remaining_parts = parts[input_end:] |
|
|
|
|
|
if ',' in input_text: |
|
|
|
inputs = [item.strip() for item in input_text.split(',') if item.strip()] |
|
else: |
|
|
|
|
|
inputs = [input_text] if input_text.strip() else [] |
|
|
|
|
|
num_words = 10 |
|
difficulty_tier = None |
|
difficulty_level = None |
|
multi_theme = False |
|
|
|
i = 0 |
|
while i < len(remaining_parts): |
|
if remaining_parts[i].lower() == 'tier' and i + 1 < len(remaining_parts): |
|
difficulty_tier = remaining_parts[i + 1] |
|
i += 2 |
|
elif remaining_parts[i].lower() == 'difficulty' and i + 1 < len(remaining_parts): |
|
difficulty_level = remaining_parts[i + 1] |
|
i += 2 |
|
elif remaining_parts[i].lower() == 'multi': |
|
multi_theme = True |
|
i += 1 |
|
elif remaining_parts[i].isdigit(): |
|
num_words = int(remaining_parts[i]) |
|
i += 1 |
|
else: |
|
i += 1 |
|
|
|
|
|
if isinstance(inputs, str): |
|
print(f"\n🎯 Words for: '{inputs}'") |
|
else: |
|
print(f"\n🎯 Words for: {inputs}") |
|
if multi_theme: |
|
print("🔍 Using multi-theme detection") |
|
print("-" * 50) |
|
|
|
try: |
|
if difficulty_level: |
|
|
|
|
|
if isinstance(inputs, list): |
|
topic_for_backend = ' '.join(inputs) |
|
else: |
|
topic_for_backend = inputs |
|
|
|
import asyncio |
|
backend_results = asyncio.run(generator.find_similar_words(topic_for_backend, difficulty_level, num_words)) |
|
|
|
if backend_results: |
|
for word_data in backend_results: |
|
word = word_data['word'] |
|
tier = word_data.get('tier', 'unknown') |
|
similarity = word_data.get('similarity', 0.0) |
|
tier_desc = generator.tier_descriptions.get(tier, tier) |
|
print(f" {word.lower():<15} (sim: {similarity:.3f}, {tier_desc})") |
|
else: |
|
print(" No words found for this difficulty level.") |
|
else: |
|
|
|
results = generator.generate_thematic_words( |
|
inputs, |
|
num_words=num_words, |
|
difficulty_tier=difficulty_tier, |
|
multi_theme=multi_theme |
|
) |
|
|
|
if results: |
|
|
|
tier_groups = {} |
|
for word, similarity, tier in results: |
|
if tier not in tier_groups: |
|
tier_groups[tier] = [] |
|
tier_groups[tier].append((word, similarity)) |
|
|
|
|
|
tier_order = [ |
|
"tier_1_ultra_common", |
|
"tier_2_extremely_common", |
|
"tier_3_very_common", |
|
"tier_4_highly_common", |
|
"tier_5_common", |
|
"tier_6_moderately_common", |
|
"tier_7_somewhat_uncommon", |
|
"tier_8_uncommon", |
|
"tier_9_rare", |
|
"tier_10_very_rare" |
|
] |
|
|
|
|
|
for tier in tier_order: |
|
if tier in tier_groups: |
|
tier_desc = generator.tier_descriptions.get(tier, tier) |
|
print(f"\n 📊 {tier_desc}:") |
|
|
|
tier_words = sorted(tier_groups[tier], key=lambda x: x[0]) |
|
for word, similarity in tier_words: |
|
print(f" {word:<15} (similarity: {similarity:.3f})") |
|
else: |
|
print(" No words found. Try a different topic or tier.") |
|
|
|
except Exception as e: |
|
print(f" ❌ Error generating words: {e}") |
|
|
|
except KeyboardInterrupt: |
|
print("\n\n👋 Interrupted by user") |
|
break |
|
except Exception as e: |
|
print(f"❌ Error: {e}") |
|
print("Type 'help' for available commands") |
|
|
|
print("\n✅ Thanks for using Unified Thematic Word Generator!") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|