|
|
|
""" |
|
Frequency Analyzer for Brown Corpus Data |
|
|
|
Reads cached frequency data and provides comprehensive analysis and visualization |
|
of word frequency distribution from NLTK Brown corpus. |
|
""" |
|
|
|
import os |
|
import pickle |
|
import sys |
|
import urllib.request |
|
import json |
|
import csv |
|
import numpy as np |
|
from collections import Counter |
|
from typing import Dict, List, Tuple, Optional |
|
|
|
|
|
try: |
|
import matplotlib.pyplot as plt |
|
import matplotlib |
|
matplotlib.use('TkAgg') |
|
HAS_MATPLOTLIB = True |
|
except ImportError: |
|
HAS_MATPLOTLIB = False |
|
print("Note: matplotlib not available. Visualizations will be disabled.") |
|
|
|
|
|
try: |
|
from wordfreq import word_frequency, zipf_frequency |
|
HAS_WORDFREQ = True |
|
except ImportError: |
|
HAS_WORDFREQ = False |
|
print("Note: wordfreq not available. External frequency sources will be disabled.") |
|
|
|
|
|
class FrequencySource: |
|
"""Represents a word frequency data source.""" |
|
|
|
def __init__(self, name: str, description: str, url: str = None, |
|
filename: str = None, parser=None): |
|
self.name = name |
|
self.description = description |
|
self.url = url |
|
self.filename = filename |
|
self.parser = parser |
|
self.frequencies = None |
|
self.total_words = 0 |
|
self.unique_words = 0 |
|
|
|
def is_available(self, cache_dir: str) -> bool: |
|
"""Check if source data is available locally.""" |
|
if self.name == 'wordfreq': |
|
return HAS_WORDFREQ |
|
elif self.filename: |
|
return os.path.exists(os.path.join(cache_dir, self.filename)) |
|
return False |
|
|
|
def load_data(self, cache_dir: str) -> bool: |
|
"""Load frequency data for this source.""" |
|
if not self.is_available(cache_dir): |
|
return False |
|
|
|
try: |
|
if self.name == 'wordfreq': |
|
|
|
self.frequencies = self.parser() |
|
elif self.parser: |
|
filepath = os.path.join(cache_dir, self.filename) |
|
self.frequencies = self.parser(filepath) |
|
else: |
|
|
|
filepath = os.path.join(cache_dir, self.filename) |
|
with open(filepath, 'rb') as f: |
|
self.frequencies = pickle.load(f) |
|
|
|
if self.frequencies: |
|
self.total_words = sum(self.frequencies.values()) if isinstance(self.frequencies, dict) else len(self.frequencies) |
|
self.unique_words = len(self.frequencies) |
|
return True |
|
except Exception as e: |
|
print(f"Error loading {self.name}: {e}") |
|
|
|
return False |
|
|
|
|
|
class FrequencyAnalyzer: |
|
def __init__(self, cache_dir: str = None): |
|
"""Initialize frequency analyzer.""" |
|
if cache_dir is None: |
|
cache_dir = os.path.join(os.path.dirname(__file__), 'model_cache') |
|
|
|
self.cache_dir = cache_dir |
|
self.word_frequencies = None |
|
self.total_words = 0 |
|
self.unique_words = 0 |
|
self.frequency_tiers = {} |
|
self.current_source = None |
|
|
|
|
|
self.frequency_sources = self._initialize_frequency_sources() |
|
|
|
|
|
self.load_default_source() |
|
|
|
def _initialize_frequency_sources(self) -> Dict[str, FrequencySource]: |
|
"""Initialize available frequency sources.""" |
|
sources = {} |
|
|
|
|
|
sources['brown'] = FrequencySource( |
|
name='brown', |
|
description='NLTK Brown Corpus (1960s, ~1.1M words)', |
|
filename='brown_frequencies.pkl' |
|
) |
|
|
|
|
|
if HAS_WORDFREQ: |
|
sources['wordfreq'] = FrequencySource( |
|
name='wordfreq', |
|
description='WordFreq Multi-source (Wikipedia, subtitles, news, books, web, Twitter, Reddit, ~2021)', |
|
parser=self._parse_wordfreq_data |
|
) |
|
|
|
|
|
sources['google_ngram'] = FrequencySource( |
|
name='google_ngram', |
|
description='Google Books Ngram Corpus (v3, 2020) - Real data download required', |
|
filename='google_ngram_frequencies.pkl' |
|
) |
|
|
|
return sources |
|
|
|
def _parse_wordfreq_data(self, filepath: str = None) -> Counter: |
|
"""Parse wordfreq data - use wordfreq's own vocabulary.""" |
|
if not HAS_WORDFREQ: |
|
return None |
|
|
|
print("Generating wordfreq dataset using wordfreq's vocabulary...") |
|
|
|
|
|
try: |
|
from wordfreq import available_languages, top_n_list |
|
print(f"WordFreq available languages: {available_languages()}") |
|
|
|
|
|
|
|
print("Fetching top words from wordfreq vocabulary...") |
|
|
|
|
|
word_counts = [500000,100000, 50000, 25000] |
|
|
|
frequency_data = Counter() |
|
|
|
for max_words in word_counts: |
|
try: |
|
print(f"Attempting to fetch top {max_words:,} words...") |
|
|
|
|
|
top_words = top_n_list('en', max_words, wordlist='large') |
|
|
|
print(f"Retrieved {len(top_words):,} words from wordfreq") |
|
|
|
|
|
filtered_out_words = [] |
|
zero_freq_words = 0 |
|
total_processed = 0 |
|
|
|
|
|
for i, word in enumerate(top_words): |
|
try: |
|
|
|
freq = word_frequency(word, 'en', wordlist='large') |
|
total_processed += 1 |
|
|
|
if freq > 0: |
|
|
|
log_bin = int(-np.log10(freq)) |
|
|
|
|
|
|
|
count = int(freq * 1_000_000_000) |
|
if count > 0: |
|
frequency_data[word] = count |
|
else: |
|
|
|
frequency_data[word] = 1 |
|
filtered_out_words.append((word, freq, log_bin)) |
|
else: |
|
|
|
zero_freq_words += 1 |
|
|
|
|
|
if (i + 1) % 5000 == 0: |
|
print(f" Processed {i+1:,}/{len(top_words):,} words ({len(frequency_data):,} with frequencies)") |
|
|
|
except Exception as e: |
|
continue |
|
|
|
|
|
print(f"\nWordFreq Processing Results:") |
|
print(f" Total processed: {total_processed:,}") |
|
print(f" Words with frequencies: {len(frequency_data):,}") |
|
print(f" Words filtered out (tiny freq): {len(filtered_out_words):,}") |
|
print(f" Words with zero frequency: {zero_freq_words:,}") |
|
|
|
if filtered_out_words: |
|
print(f"\nFrequency distribution of filtered words:") |
|
bin_counts = Counter(log_bin for _, _, log_bin in filtered_out_words) |
|
for bin_num in sorted(bin_counts.keys()): |
|
print(f" Bin {bin_num} (freq ~1e-{bin_num}): {bin_counts[bin_num]:,} words") |
|
|
|
|
|
print(f"\nSample filtered words by frequency bin:") |
|
bins_sample = {} |
|
for word, freq, log_bin in filtered_out_words[:50]: |
|
if log_bin not in bins_sample: |
|
bins_sample[log_bin] = [] |
|
if len(bins_sample[log_bin]) < 3: |
|
bins_sample[log_bin].append((word, freq)) |
|
|
|
for bin_num in sorted(bins_sample.keys()): |
|
print(f" Bin {bin_num}: {', '.join(f'{w}({f:.2e})' for w, f in bins_sample[bin_num])}") |
|
|
|
if len(frequency_data) > 1000: |
|
break |
|
|
|
except Exception as e: |
|
print(f"Failed to fetch {max_words:,} words: {e}") |
|
continue |
|
|
|
if len(frequency_data) == 0: |
|
print("Fallback: generating frequencies for common words manually...") |
|
|
|
common_words = [ |
|
"the", "be", "to", "of", "and", "a", "in", "that", "have", "i", |
|
"it", "for", "not", "on", "with", "he", "as", "you", "do", "at", |
|
"this", "but", "his", "by", "from", "they", "she", "or", "an", "will", |
|
"my", "one", "all", "would", "there", "their", "what", "so", "up", "out" |
|
] |
|
|
|
for word in common_words: |
|
try: |
|
freq = word_frequency(word, 'en', wordlist='large') |
|
if freq > 0: |
|
count = int(freq * 1_000_000) |
|
if count > 0: |
|
frequency_data[word] = count |
|
except: |
|
continue |
|
|
|
print(f"✓ Generated wordfreq dataset: {len(frequency_data):,} words with real frequencies") |
|
return frequency_data |
|
|
|
except ImportError as e: |
|
print(f"Could not access wordfreq vocabulary functions: {e}") |
|
return None |
|
|
|
def load_default_source(self): |
|
"""Load the best available frequency source.""" |
|
|
|
priority_sources = ['wordfreq', 'brown'] |
|
|
|
for source_name in priority_sources: |
|
if self.switch_source(source_name): |
|
break |
|
|
|
if not self.current_source: |
|
print("Warning: No frequency sources available!") |
|
|
|
def switch_source(self, source_name: str) -> bool: |
|
"""Switch to a different frequency source.""" |
|
if source_name not in self.frequency_sources: |
|
print(f"Unknown source: {source_name}") |
|
return False |
|
|
|
source = self.frequency_sources[source_name] |
|
|
|
if source.load_data(self.cache_dir): |
|
self.current_source = source |
|
self.word_frequencies = source.frequencies |
|
self.total_words = source.total_words |
|
self.unique_words = source.unique_words |
|
self.create_frequency_tiers() |
|
print(f"✓ Switched to {source.name}: {source.description}") |
|
return True |
|
else: |
|
print(f"✗ Source {source_name} not available. Use 'download {source_name}' to get it.") |
|
return False |
|
|
|
def download_source(self, source_name: str) -> bool: |
|
"""Download external frequency source.""" |
|
if source_name not in self.frequency_sources: |
|
print(f"Unknown source: {source_name}") |
|
return False |
|
|
|
source = self.frequency_sources[source_name] |
|
|
|
if source_name == 'google_ngram': |
|
return self._download_google_ngram() |
|
else: |
|
print(f"Download not implemented for {source_name}") |
|
return False |
|
|
|
def _download_google_ngram(self) -> bool: |
|
"""Download and process actual Google Books Ngram frequency data.""" |
|
print("Downloading Google Books Ngram frequency data...") |
|
print("Using streaming download from github.com/orgtre/google-books-ngram-frequency") |
|
|
|
|
|
url = "https://raw.githubusercontent.com/orgtre/google-books-ngram-frequency/main/ngrams/1grams_english.csv" |
|
|
|
try: |
|
print(f"Downloading top entries from: {url}") |
|
print("Note: Processing first 100,000 entries (most frequent words)") |
|
|
|
|
|
import subprocess |
|
result = subprocess.run([ |
|
'curl', '-s', '-L', '--max-time', '60', url |
|
], capture_output=True, text=True, timeout=90) |
|
|
|
if result.returncode != 0: |
|
raise Exception(f"curl failed: {result.stderr}") |
|
|
|
content = result.stdout |
|
print(f"Downloaded {len(content)} characters") |
|
|
|
|
|
frequency_data = Counter() |
|
lines = content.strip().split('\n') |
|
|
|
print(f"Processing {len(lines)} lines...") |
|
|
|
|
|
csv_reader = csv.DictReader(lines) |
|
|
|
for line_num, row in enumerate(csv_reader, 1): |
|
try: |
|
word = row['ngram'].strip().lower() |
|
freq_str = row['freq'].strip() |
|
|
|
|
|
freq_value = float(freq_str.replace(',', '')) |
|
|
|
if freq_value > 0: |
|
|
|
if len(word) > 1 and word.isalpha() and word.isascii() and ' ' not in word: |
|
frequency_data[word] = int(freq_value) |
|
|
|
|
|
if line_num % 10000 == 0: |
|
print(f" Processed {line_num:,} lines, found {len(frequency_data):,} valid words") |
|
|
|
|
|
if line_num >= 100000: |
|
print(f" Processed first 100,000 most frequent entries") |
|
break |
|
|
|
except (ValueError, KeyError, IndexError) as e: |
|
continue |
|
|
|
if len(frequency_data) > 1000: |
|
|
|
cache_path = os.path.join(self.cache_dir, 'google_ngram_frequencies.pkl') |
|
with open(cache_path, 'wb') as f: |
|
pickle.dump(frequency_data, f) |
|
|
|
print(f"✓ Downloaded Google Ngram data: {len(frequency_data):,} words") |
|
print(f"✓ Saved to: {cache_path}") |
|
return True |
|
else: |
|
print(f"✗ Not enough valid data found ({len(frequency_data)} words)") |
|
return False |
|
|
|
except Exception as e: |
|
print(f"✗ Failed to download Google Ngram data: {e}") |
|
return False |
|
|
|
|
|
|
|
def list_sources(self): |
|
"""List all available frequency sources.""" |
|
print(f"\n{'='*70}") |
|
print("AVAILABLE FREQUENCY SOURCES") |
|
print(f"{'='*70}") |
|
print(f"{'Source':<12} {'Available':<10} {'Description'}") |
|
print("-" * 70) |
|
|
|
for name, source in self.frequency_sources.items(): |
|
available = "✓ Yes" if source.is_available(self.cache_dir) else "✗ No" |
|
current = " (current)" if self.current_source and self.current_source.name == name else "" |
|
print(f"{name:<12} {available:<10} {source.description}{current}") |
|
|
|
print(f"\nCurrent source: {self.current_source.name if self.current_source else 'None'}") |
|
|
|
def compare_word_across_sources(self, word: str): |
|
"""Compare how a word is classified across different sources.""" |
|
print(f"\n{'='*70}") |
|
print(f"WORD COMPARISON: '{word}'") |
|
print(f"{'='*70}") |
|
print(f"{'Source':<12} {'Count':<8} {'Frequency':<12} {'Tier':<12} {'Available'}") |
|
print("-" * 70) |
|
|
|
current_source = self.current_source |
|
|
|
for name, source in self.frequency_sources.items(): |
|
if source.is_available(self.cache_dir): |
|
|
|
if source.load_data(self.cache_dir): |
|
temp_freq = source.frequencies |
|
temp_total = sum(temp_freq.values()) if isinstance(temp_freq, dict) else len(temp_freq) |
|
|
|
count = temp_freq.get(word.lower(), 0) |
|
freq = count / temp_total if temp_total > 0 else 0.0 |
|
|
|
|
|
if freq > 0.001: |
|
tier = "very_common" |
|
elif freq > 0.0001: |
|
tier = "common" |
|
elif freq > 0.00001: |
|
tier = "uncommon" |
|
else: |
|
tier = "rare" |
|
|
|
available = "✓" |
|
print(f"{name:<12} {count:<8} {freq:<12.6f} {tier:<12} {available}") |
|
else: |
|
print(f"{name:<12} {'N/A':<8} {'N/A':<12} {'N/A':<12} {'✗'}") |
|
else: |
|
print(f"{name:<12} {'N/A':<8} {'N/A':<12} {'N/A':<12} {'✗'}") |
|
|
|
|
|
if current_source: |
|
current_source.load_data(self.cache_dir) |
|
self.current_source = current_source |
|
self.word_frequencies = current_source.frequencies |
|
self.total_words = current_source.total_words |
|
self.unique_words = current_source.unique_words |
|
|
|
def load_frequency_data(self) -> bool: |
|
"""Load cached Brown corpus frequency data.""" |
|
freq_cache_path = os.path.join(self.cache_dir, 'brown_frequencies.pkl') |
|
|
|
if not os.path.exists(freq_cache_path): |
|
print(f"Error: Frequency cache not found at {freq_cache_path}") |
|
print("Please run the thematic word generator first to create the cache.") |
|
return False |
|
|
|
try: |
|
print("Loading frequency data from cache...") |
|
with open(freq_cache_path, 'rb') as f: |
|
self.word_frequencies = pickle.load(f) |
|
|
|
self.total_words = sum(self.word_frequencies.values()) |
|
self.unique_words = len(self.word_frequencies) |
|
|
|
print(f"✓ Loaded frequency data:") |
|
print(f" - Total word tokens: {self.total_words:,}") |
|
print(f" - Unique words: {self.unique_words:,}") |
|
|
|
return True |
|
|
|
except Exception as e: |
|
print(f"Error loading frequency cache: {e}") |
|
return False |
|
|
|
def create_frequency_tiers(self): |
|
"""Create detailed frequency tier classifications with 10 bins.""" |
|
if not self.word_frequencies: |
|
return |
|
|
|
tiers = {} |
|
most_common = self.word_frequencies.most_common(50000) |
|
|
|
|
|
all_counts = [count for word, count in self.word_frequencies.items()] |
|
all_counts.sort(reverse=True) |
|
|
|
|
|
tier_definitions = [ |
|
("tier_1_ultra_common", 0.999, "Ultra Common (Top 0.1%)"), |
|
("tier_2_extremely_common", 0.995, "Extremely Common (Top 0.5%)"), |
|
("tier_3_very_common", 0.99, "Very Common (Top 1%)"), |
|
("tier_4_highly_common", 0.97, "Highly Common (Top 3%)"), |
|
("tier_5_common", 0.92, "Common (Top 8%)"), |
|
("tier_6_moderately_common", 0.85, "Moderately Common (Top 15%)"), |
|
("tier_7_somewhat_uncommon", 0.70, "Somewhat Uncommon (Top 30%)"), |
|
("tier_8_uncommon", 0.50, "Uncommon (Top 50%)"), |
|
("tier_9_rare", 0.25, "Rare (Top 75%)"), |
|
("tier_10_very_rare", 0.0, "Very Rare (Bottom 25%)") |
|
] |
|
|
|
|
|
thresholds = [] |
|
for tier_name, percentile, description in tier_definitions: |
|
if percentile > 0: |
|
idx = int((1 - percentile) * len(all_counts)) |
|
threshold = all_counts[min(idx, len(all_counts) - 1)] |
|
else: |
|
threshold = 0 |
|
thresholds.append((tier_name, threshold, description)) |
|
|
|
|
|
for word, count in self.word_frequencies.items(): |
|
assigned = False |
|
for tier_name, threshold, description in thresholds: |
|
if count >= threshold: |
|
tiers[word] = tier_name |
|
assigned = True |
|
break |
|
|
|
if not assigned: |
|
tiers[word] = "tier_10_very_rare" |
|
|
|
self.frequency_tiers = tiers |
|
self.tier_descriptions = {name: desc for name, _, desc in thresholds} |
|
|
|
|
|
tier_counts = Counter(tiers.values()) |
|
print(f"\n✓ Frequency tier distribution (10-tier system):") |
|
|
|
|
|
tier_order = [f"tier_{i}_{name}" for i, name in enumerate([ |
|
"ultra_common", "extremely_common", "very_common", "highly_common", |
|
"common", "moderately_common", "somewhat_uncommon", "uncommon", |
|
"rare", "very_rare" |
|
], 1)] |
|
|
|
for tier_key in tier_order: |
|
if tier_key in tier_counts: |
|
count = tier_counts[tier_key] |
|
description = self.tier_descriptions.get(tier_key, tier_key) |
|
percentage = (count / len(tiers)) * 100 |
|
print(f" - {description}: {count:,} words ({percentage:.1f}%)") |
|
|
|
def get_word_info(self, word: str) -> Tuple[int, float, str, int]: |
|
"""Get detailed information about a word.""" |
|
word = word.lower() |
|
count = self.word_frequencies.get(word, 0) |
|
relative_freq = count / self.total_words if self.total_words > 0 else 0.0 |
|
tier = self.frequency_tiers.get(word, "rare") |
|
|
|
|
|
rank = 0 |
|
if count > 0: |
|
rank = sum(1 for w, c in self.word_frequencies.items() if c > count) + 1 |
|
|
|
return count, relative_freq, tier, rank |
|
|
|
def show_top_words(self, n: int = 50): |
|
"""Display the most common words.""" |
|
print(f"\n{'='*60}") |
|
print(f"TOP {n} MOST COMMON WORDS") |
|
print(f"{'='*60}") |
|
print(f"{'Rank':<6} {'Word':<15} {'Count':<8} {'Frequency':<12} {'Tier'}") |
|
print("-" * 60) |
|
|
|
for i, (word, count) in enumerate(self.word_frequencies.most_common(n)): |
|
relative_freq = count / self.total_words |
|
tier = self.frequency_tiers.get(word, "rare") |
|
print(f"{i+1:<6} {word:<15} {count:<8} {relative_freq:<12.6f} {tier}") |
|
|
|
def show_bottom_words(self, n: int = 50): |
|
"""Display the least common words.""" |
|
print(f"\n{'='*60}") |
|
print(f"BOTTOM {n} LEAST COMMON WORDS") |
|
print(f"{'='*60}") |
|
print(f"{'Word':<15} {'Count':<8} {'Frequency':<12} {'Tier'}") |
|
print("-" * 60) |
|
|
|
|
|
bottom_words = self.word_frequencies.most_common()[:-n-1:-1] |
|
|
|
for word, count in bottom_words: |
|
relative_freq = count / self.total_words |
|
tier = self.frequency_tiers.get(word, "rare") |
|
print(f"{word:<15} {count:<8} {relative_freq:<12.6f} {tier}") |
|
|
|
def show_frequency_ranges(self): |
|
"""Show distribution of words across detailed frequency ranges.""" |
|
print(f"\n{'='*70}") |
|
print("DETAILED FREQUENCY RANGE DISTRIBUTION") |
|
print(f"{'='*70}") |
|
|
|
|
|
ranges = [ |
|
("Ultra High (>1e-2)", lambda f: f > 0.01), |
|
("Extremely High (1e-3 to 1e-2)", lambda f: 0.001 < f <= 0.01), |
|
("Very High (1e-4 to 1e-3)", lambda f: 0.0001 < f <= 0.001), |
|
("High (1e-5 to 1e-4)", lambda f: 0.00001 < f <= 0.0001), |
|
("Moderately High (1e-6 to 1e-5)", lambda f: 0.000001 < f <= 0.00001), |
|
("Medium (1e-7 to 1e-6)", lambda f: 0.0000001 < f <= 0.000001), |
|
("Moderately Low (1e-8 to 1e-7)", lambda f: 0.00000001 < f <= 0.0000001), |
|
("Low (1e-9 to 1e-8)", lambda f: 0.000000001 < f <= 0.00000001), |
|
("Very Low (1e-10 to 1e-9)", lambda f: 0.0000000001 < f <= 0.000000001), |
|
("Ultra Low (<1e-10)", lambda f: f <= 0.0000000001) |
|
] |
|
|
|
print(f"{'Range':<30} {'Count':<10} {'Percentage'}") |
|
print("-" * 70) |
|
|
|
for range_name, condition in ranges: |
|
count = sum(1 for word, word_count in self.word_frequencies.items() |
|
if condition(word_count / self.total_words)) |
|
percentage = (count / self.unique_words) * 100 |
|
print(f"{range_name:<30} {count:>8,} words ({percentage:>5.1f}%)") |
|
|
|
def show_tier_samples(self, n: int = 5): |
|
"""Show sample words from each frequency tier.""" |
|
print(f"\n{'='*80}") |
|
print(f"SAMPLE WORDS BY TIER (showing {n} per tier)") |
|
print(f"{'='*80}") |
|
|
|
|
|
tier_order = [f"tier_{i}_{name}" for i, name in enumerate([ |
|
"ultra_common", "extremely_common", "very_common", "highly_common", |
|
"common", "moderately_common", "somewhat_uncommon", "uncommon", |
|
"rare", "very_rare" |
|
], 1)] |
|
|
|
tier_samples = {tier: [] for tier in tier_order} |
|
|
|
|
|
for word, tier in self.frequency_tiers.items(): |
|
if tier in tier_samples and len(tier_samples[tier]) < n: |
|
count, freq, _, rank = self.get_word_info(word) |
|
tier_samples[tier].append((word, count, freq, rank)) |
|
|
|
|
|
for tier in tier_order: |
|
if tier in tier_samples and tier_samples[tier]: |
|
description = self.tier_descriptions.get(tier, tier) |
|
print(f"\n{description}:") |
|
print(f"{'Word':<15} {'Count':<12} {'Frequency':<12} {'Rank'}") |
|
print("-" * 55) |
|
|
|
for word, count, freq, rank in tier_samples[tier]: |
|
print(f"{word:<15} {count:<12,} {freq:<12.8f} {rank:,}") |
|
|
|
def lookup_word(self, word: str): |
|
"""Look up detailed information for a specific word.""" |
|
count, freq, tier, rank = self.get_word_info(word) |
|
|
|
print(f"\nWord: '{word}'") |
|
print(f" Count: {count:,}") |
|
print(f" Frequency: {freq:.8f}") |
|
print(f" Tier: {tier}") |
|
print(f" Rank: {rank:,} (out of {self.unique_words:,})") |
|
|
|
if count == 0: |
|
print(" Note: Word not found in Brown corpus") |
|
|
|
def batch_lookup(self, words: List[str]): |
|
"""Look up multiple words and compare them.""" |
|
print(f"\n{'='*80}") |
|
print("BATCH WORD LOOKUP") |
|
print(f"{'='*80}") |
|
print(f"{'Word':<15} {'Count':<8} {'Frequency':<12} {'Tier':<12} {'Rank'}") |
|
print("-" * 80) |
|
|
|
results = [] |
|
for word in words: |
|
count, freq, tier, rank = self.get_word_info(word) |
|
results.append((word, count, freq, tier, rank)) |
|
print(f"{word:<15} {count:<8} {freq:<12.6f} {tier:<12} {rank:,}") |
|
|
|
return results |
|
|
|
def analyze_zipf_law(self): |
|
"""Analyze how well the frequency distribution follows Zipf's law.""" |
|
print(f"\n{'='*60}") |
|
print("ZIPF'S LAW ANALYSIS") |
|
print(f"{'='*60}") |
|
|
|
|
|
top_words = self.word_frequencies.most_common(1000) |
|
|
|
print("Zipf's law prediction vs actual frequency (top 20 words):") |
|
print(f"{'Rank':<6} {'Word':<15} {'Actual Freq':<12} {'Zipf Pred':<12} {'Ratio'}") |
|
print("-" * 70) |
|
|
|
|
|
baseline_freq = top_words[0][1] / self.total_words |
|
|
|
for i, (word, count) in enumerate(top_words[:20]): |
|
rank = i + 1 |
|
actual_freq = count / self.total_words |
|
zipf_predicted = baseline_freq / rank |
|
ratio = actual_freq / zipf_predicted if zipf_predicted > 0 else 0 |
|
|
|
print(f"{rank:<6} {word:<15} {actual_freq:<12.6f} {zipf_predicted:<12.6f} {ratio:<8.2f}") |
|
|
|
def plot_frequency_distribution(self): |
|
"""Create visualizations of frequency distribution.""" |
|
if not HAS_MATPLOTLIB: |
|
print("Matplotlib not available. Skipping visualizations.") |
|
return |
|
|
|
print("\nGenerating frequency distribution plots...") |
|
|
|
|
|
counts = list(self.word_frequencies.values()) |
|
counts.sort(reverse=True) |
|
|
|
|
|
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10)) |
|
source_name = self.current_source.description if self.current_source else 'Unknown Source' |
|
fig.suptitle(f'Frequency Analysis - {source_name}', fontsize=16) |
|
|
|
|
|
ax1.hist([c for c in counts if c > 1], bins=50, alpha=0.7, edgecolor='black') |
|
ax1.set_xlabel('Word Count') |
|
ax1.set_ylabel('Number of Words') |
|
ax1.set_title('Word Count Distribution') |
|
ax1.set_yscale('log') |
|
|
|
|
|
ranks = list(range(1, min(1000, len(counts)) + 1)) |
|
top_counts = counts[:len(ranks)] |
|
ax2.loglog(ranks, top_counts, 'bo-', alpha=0.7, markersize=3) |
|
ax2.set_xlabel('Rank') |
|
ax2.set_ylabel('Frequency') |
|
ax2.set_title("Zipf's Law (Rank vs Frequency)") |
|
ax2.grid(True, alpha=0.3) |
|
|
|
|
|
tier_counts = Counter(self.frequency_tiers.values()) |
|
|
|
|
|
tier_order = [f"tier_{i}_{name}" for i, name in enumerate([ |
|
"ultra_common", "extremely_common", "very_common", "highly_common", |
|
"common", "moderately_common", "somewhat_uncommon", "uncommon", |
|
"rare", "very_rare" |
|
], 1)] |
|
|
|
tier_labels = [f"T{i}" for i in range(1, 11)] |
|
tier_values = [tier_counts.get(tier, 0) for tier in tier_order] |
|
|
|
bars = ax3.bar(tier_labels, tier_values, alpha=0.7, edgecolor='black') |
|
ax3.set_ylabel('Number of Words') |
|
ax3.set_title('10-Tier Frequency Distribution') |
|
ax3.set_xlabel('Frequency Tiers (T1=Ultra Common → T10=Very Rare)') |
|
|
|
|
|
import matplotlib.cm as cm |
|
colors = cm.RdYlGn_r(np.linspace(0, 1, len(bars))) |
|
for bar, color in zip(bars, colors): |
|
bar.set_color(color) |
|
|
|
|
|
ranges = [ |
|
"Ultra\nHigh", "Extremely\nHigh", "Very\nHigh", "High", "Mod.\nHigh", |
|
"Medium", "Mod.\nLow", "Low", "Very\nLow", "Ultra\nLow" |
|
] |
|
|
|
range_counts = [] |
|
conditions = [ |
|
lambda f: f > 0.01, |
|
lambda f: 0.001 < f <= 0.01, |
|
lambda f: 0.0001 < f <= 0.001, |
|
lambda f: 0.00001 < f <= 0.0001, |
|
lambda f: 0.000001 < f <= 0.00001, |
|
lambda f: 0.0000001 < f <= 0.000001, |
|
lambda f: 0.00000001 < f <= 0.0000001, |
|
lambda f: 0.000000001 < f <= 0.00000001, |
|
lambda f: 0.0000000001 < f <= 0.000000001, |
|
lambda f: f <= 0.0000000001 |
|
] |
|
|
|
for condition in conditions: |
|
count = sum(1 for word, word_count in self.word_frequencies.items() |
|
if condition(word_count / self.total_words)) |
|
range_counts.append(count) |
|
|
|
bars4 = ax4.bar(ranges, range_counts, alpha=0.7, edgecolor='black') |
|
ax4.set_ylabel('Number of Words') |
|
ax4.set_title('Logarithmic Frequency Ranges') |
|
ax4.tick_params(axis='x', rotation=45) |
|
|
|
|
|
colors4 = cm.viridis(np.linspace(0, 1, len(bars4))) |
|
for bar, color in zip(bars4, colors4): |
|
bar.set_color(color) |
|
|
|
plt.tight_layout() |
|
|
|
|
|
plot_path = os.path.join(self.cache_dir, 'frequency_analysis.png') |
|
plt.savefig(plot_path, dpi=300, bbox_inches='tight') |
|
print(f"✓ Saved plots to: {plot_path}") |
|
|
|
|
|
plt.show() |
|
|
|
def interactive_mode(self): |
|
"""Run interactive analysis mode.""" |
|
print(f"\n{'='*60}") |
|
print("INTERACTIVE FREQUENCY ANALYZER") |
|
print(f"{'='*60}") |
|
print("Commands:") |
|
print(" lookup <word> - Look up word frequency") |
|
print(" batch <w1,w2,w3> - Look up multiple words") |
|
print(" top [n] - Show top n most common words") |
|
print(" bottom [n] - Show bottom n least common words") |
|
print(" ranges - Show frequency range distribution") |
|
print(" tiers - Show sample words by tier") |
|
print(" zipf - Analyze Zipf's law") |
|
print(" plot - Generate visualizations") |
|
print(" stats - Show basic statistics") |
|
print(" sources - List available frequency sources") |
|
print(" source <name> - Switch to frequency source") |
|
print(" download <source> - Download/create frequency source") |
|
print(" compare <word> - Compare word across sources") |
|
print(" help - Show this help message") |
|
print(" quit - Exit") |
|
print("-" * 60) |
|
|
|
while True: |
|
try: |
|
cmd = input("\nfreq> ").strip() |
|
|
|
if cmd.lower() == 'quit': |
|
break |
|
|
|
parts = cmd.split() |
|
if not parts: |
|
continue |
|
|
|
command = parts[0].lower() |
|
|
|
if command == 'lookup' and len(parts) > 1: |
|
self.lookup_word(parts[1]) |
|
|
|
elif command == 'batch' and len(parts) > 1: |
|
words = [w.strip() for w in ' '.join(parts[1:]).split(',')] |
|
self.batch_lookup(words) |
|
|
|
elif command == 'top': |
|
n = int(parts[1]) if len(parts) > 1 else 20 |
|
self.show_top_words(n) |
|
|
|
elif command == 'bottom': |
|
n = int(parts[1]) if len(parts) > 1 else 20 |
|
self.show_bottom_words(n) |
|
|
|
elif command == 'ranges': |
|
self.show_frequency_ranges() |
|
|
|
elif command == 'tiers': |
|
self.show_tier_samples() |
|
|
|
elif command == 'zipf': |
|
self.analyze_zipf_law() |
|
|
|
elif command == 'plot': |
|
self.plot_frequency_distribution() |
|
|
|
elif command == 'stats': |
|
print(f"\nBasic Statistics:") |
|
print(f" Current source: {self.current_source.name if self.current_source else 'None'}") |
|
print(f" Total word tokens: {self.total_words:,}") |
|
print(f" Unique words: {self.unique_words:,}") |
|
if self.word_frequencies: |
|
print(f" Average word length: {sum(len(w) for w in self.word_frequencies) / self.unique_words:.1f}") |
|
|
|
|
|
most_common_word, most_common_count = self.word_frequencies.most_common(1)[0] |
|
print(f" Most common word: '{most_common_word}' ({most_common_count:,} times)") |
|
|
|
elif command == 'sources': |
|
self.list_sources() |
|
|
|
elif command == 'source' and len(parts) > 1: |
|
self.switch_source(parts[1]) |
|
|
|
elif command == 'download' and len(parts) > 1: |
|
if self.download_source(parts[1]): |
|
print(f"✓ Downloaded {parts[1]}. Use 'source {parts[1]}' to switch to it.") |
|
|
|
elif command == 'compare' and len(parts) > 1: |
|
self.compare_word_across_sources(parts[1]) |
|
|
|
elif command == 'help': |
|
print(f"\n{'='*60}") |
|
print("AVAILABLE COMMANDS") |
|
print(f"{'='*60}") |
|
print(" lookup <word> - Look up word frequency") |
|
print(" batch <w1,w2,w3> - Look up multiple words") |
|
print(" top [n] - Show top n most common words") |
|
print(" bottom [n] - Show bottom n least common words") |
|
print(" ranges - Show frequency range distribution") |
|
print(" tiers - Show sample words by tier") |
|
print(" zipf - Analyze Zipf's law") |
|
print(" plot - Generate visualizations") |
|
print(" stats - Show basic statistics") |
|
print(" sources - List available frequency sources") |
|
print(" source <name> - Switch to frequency source") |
|
print(" download <source> - Download/create frequency source") |
|
print(" compare <word> - Compare word across sources") |
|
print(" help - Show this help message") |
|
print(" quit - Exit") |
|
|
|
else: |
|
print("Unknown command. Type 'help' for available commands or 'quit' to exit.") |
|
|
|
except KeyboardInterrupt: |
|
break |
|
except Exception as e: |
|
print(f"Error: {e}") |
|
|
|
print("\nGoodbye!") |
|
|
|
|
|
def main(): |
|
"""Main function.""" |
|
|
|
cache_dir = os.path.join(os.path.dirname(__file__), 'model_cache') |
|
if not os.path.exists(cache_dir): |
|
print(f"Error: Cache directory not found: {cache_dir}") |
|
print("Please run the thematic word generator first to create the cache.") |
|
sys.exit(1) |
|
|
|
|
|
analyzer = FrequencyAnalyzer(cache_dir) |
|
|
|
if not analyzer.word_frequencies: |
|
print("Failed to load frequency data. Exiting.") |
|
sys.exit(1) |
|
|
|
|
|
if len(sys.argv) > 1: |
|
command = sys.argv[1].lower() |
|
|
|
if command == 'stats': |
|
print(f"\nBrown Corpus Statistics:") |
|
print(f" Total word tokens: {analyzer.total_words:,}") |
|
print(f" Unique words: {analyzer.unique_words:,}") |
|
|
|
elif command == 'top': |
|
n = int(sys.argv[2]) if len(sys.argv) > 2 else 20 |
|
analyzer.show_top_words(n) |
|
|
|
elif command == 'bottom': |
|
n = int(sys.argv[2]) if len(sys.argv) > 2 else 20 |
|
analyzer.show_bottom_words(n) |
|
|
|
elif command == 'ranges': |
|
analyzer.show_frequency_ranges() |
|
|
|
elif command == 'tiers': |
|
analyzer.show_tier_samples() |
|
|
|
elif command == 'zipf': |
|
analyzer.analyze_zipf_law() |
|
|
|
elif command == 'plot': |
|
analyzer.plot_frequency_distribution() |
|
|
|
elif command == 'lookup' and len(sys.argv) > 2: |
|
analyzer.lookup_word(sys.argv[2]) |
|
|
|
elif command == 'interactive': |
|
analyzer.interactive_mode() |
|
|
|
elif command == 'sources': |
|
analyzer.list_sources() |
|
|
|
elif command == 'download' and len(sys.argv) > 2: |
|
source_name = sys.argv[2] |
|
if analyzer.download_source(source_name): |
|
print(f"✓ Downloaded {source_name}. Use 'source {source_name}' to switch to it.") |
|
|
|
elif command == 'source' and len(sys.argv) > 2: |
|
analyzer.switch_source(sys.argv[2]) |
|
|
|
elif command == 'compare' and len(sys.argv) > 2: |
|
analyzer.compare_word_across_sources(sys.argv[2]) |
|
|
|
elif command == 'help': |
|
print("\nAvailable commands:") |
|
print(" stats - Show basic frequency statistics") |
|
print(" top [n] - Show top n most common words") |
|
print(" bottom [n] - Show bottom n least common words") |
|
print(" ranges - Show frequency range distribution") |
|
print(" tiers - Show sample words by tier") |
|
print(" zipf - Analyze Zipf's law") |
|
print(" plot - Generate visualizations") |
|
print(" sources - List available frequency sources") |
|
print(" download <source> - Download/create frequency source") |
|
print(" source <name> - Switch to frequency source") |
|
print(" compare <word> - Compare word across sources") |
|
print(" lookup <word> - Look up word frequency") |
|
print(" interactive - Enter interactive mode") |
|
print(" help - Show this help message") |
|
|
|
else: |
|
print("Usage: python frequency_analyzer.py [help|stats|top|bottom|ranges|tiers|zipf|plot|sources|download <source>|source <name>|compare <word>|lookup <word>|interactive]") |
|
|
|
else: |
|
|
|
print(f"\nBrown Corpus Overview:") |
|
print(f" Total tokens: {analyzer.total_words:,}") |
|
print(f" Unique words: {analyzer.unique_words:,}") |
|
|
|
analyzer.show_top_words(10) |
|
analyzer.show_tier_samples(5) |
|
analyzer.interactive_mode() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|