Spaces:
Sleeping
Sleeping
import io | |
import os | |
from typing import List, Tuple, Union | |
import gradio as gr | |
import nltk | |
# --- NLTK resources (cover both old & new names) ----------------------------- | |
NLTK_PACKAGES = [ | |
# Tokenizers | |
"punkt", "punkt_tab", | |
# Stopwords / Lemmas | |
"stopwords", "wordnet", "omw-1.4", | |
# POS taggers (old and new english-specific) | |
"averaged_perceptron_tagger", "averaged_perceptron_tagger_eng", | |
# NE chunkers (old and new) | |
"maxent_ne_chunker", "maxent_ne_chunker_tab", | |
# Word lists used by NE chunker | |
"words", | |
] | |
def ensure_nltk_resources() -> str: | |
messages = [] | |
for pkg in NLTK_PACKAGES: | |
try: | |
# try to find generically | |
nltk.download(pkg, quiet=True) # idempotent | |
messages.append(f"OK: {pkg}") | |
except Exception as e: | |
messages.append(f"Failed {pkg}: {e}") | |
return " | ".join(messages) if messages else "Resources checked." | |
# Safe imports after downloads (works even if already present) | |
from nltk.tokenize import word_tokenize | |
from nltk.corpus import stopwords | |
from nltk.stem import PorterStemmer, WordNetLemmatizer | |
from nltk import pos_tag | |
from nltk.chunk import ne_chunk | |
# --- Helpers ----------------------------------------------------------------- | |
def _read_bytes(path: str) -> bytes: | |
with open(path, "rb") as f: | |
return f.read() | |
def read_file(upload: Union[str, gr.File]) -> str: | |
""" | |
Reads text from Gradio's File input. Supports .txt and .docx. | |
Works whether `upload` is a path (str) or a file-like with .name/.read(). | |
""" | |
if upload is None: | |
return "" | |
# Normalize to path + bytes | |
if isinstance(upload, str): | |
path = upload | |
name = os.path.basename(path) | |
ext = os.path.splitext(name)[1].lower() | |
content = _read_bytes(path) | |
else: | |
# gradio might pass a tempfile object or dict-like | |
name = getattr(upload, "name", "") or "" | |
path = getattr(upload, "name", None) | |
ext = os.path.splitext(name)[1].lower() | |
try: | |
# Some envs require reading from disk instead of .read() | |
if path and os.path.exists(path): | |
content = _read_bytes(path) | |
else: | |
content = upload.read() | |
except Exception: | |
# last-resort: try path again | |
if path and os.path.exists(path): | |
content = _read_bytes(path) | |
else: | |
return "ERROR: Could not read uploaded file." | |
if ext == ".txt": | |
for enc in ("utf-8", "latin-1", "utf-16"): | |
try: | |
return content.decode(enc) | |
except UnicodeDecodeError: | |
continue | |
return "ERROR: Could not decode text file. Try UTF-8 or plain text." | |
if ext == ".docx": | |
try: | |
import docx # python-docx | |
except ImportError: | |
return "ERROR: python-docx not installed. Add 'python-docx' to requirements.txt." | |
f = io.BytesIO(content) | |
doc = docx.Document(f) | |
return "\n".join(p.text for p in doc.paragraphs) | |
return f"Unsupported file type: {ext}. Please upload .txt or .docx." | |
def extract_ner(ne_tree) -> List[Tuple[str, str]]: | |
entities = [] | |
for subtree in ne_tree: | |
if hasattr(subtree, "label"): | |
label = subtree.label() | |
text = " ".join(token for token, _ in subtree.leaves()) | |
entities.append((text, label)) | |
return entities | |
# --- Core processing ---------------------------------------------------------- | |
def process_text(raw_text: str, steps: List[str]) -> str: | |
if not raw_text or raw_text.strip() == "": | |
return "⚠️ No text provided." | |
# Make sure required resources exist (quietly) | |
ensure_nltk_resources() | |
report_lines = [] | |
text = raw_text | |
tokens = None | |
filtered_tokens = None | |
stemmed_tokens = None | |
lemmatized_tokens = None | |
pos_tags_val = None | |
# 1) Tokenize (also needed by later steps) | |
if "Tokenize text." in steps or any( | |
s in steps for s in [ | |
"Remove stopwords.", "Stem words.", "Lemmatize words.", | |
"Tag parts of speech.", "Extract named entities." | |
] | |
): | |
tokens = word_tokenize(text) | |
if "Tokenize text." in steps: | |
report_lines.append("### Tokens") | |
report_lines.append(f"`{tokens}`\n") | |
# 2) Stopwords | |
filtered_tokens = tokens | |
if "Remove stopwords." in steps: | |
sw = set(stopwords.words("english")) | |
filtered_tokens = [w for w in (tokens or []) if w.lower() not in sw] | |
report_lines.append("### After Stopword Removal") | |
report_lines.append(f"`{filtered_tokens}`\n") | |
# 3) Stemming | |
stemmed_tokens = filtered_tokens | |
if "Stem words." in steps: | |
stemmer = PorterStemmer() | |
stemmed_tokens = [stemmer.stem(w) for w in (filtered_tokens or [])] | |
report_lines.append("### Stemmed Tokens (Porter)") | |
report_lines.append(f"`{stemmed_tokens}`\n") | |
# 4) Lemmatization (use filtered tokens so lemmatization compares apples) | |
lemmatized_tokens = stemmed_tokens if stemmed_tokens is not None else filtered_tokens | |
if "Lemmatize words." in steps: | |
lemmatizer = WordNetLemmatizer() | |
# If you prefer POS-aware lemmas, we could pass pos=... after tagging | |
lemmatized_tokens = [lemmatizer.lemmatize(w) for w in (filtered_tokens or [])] | |
report_lines.append("### Lemmatized Tokens (WordNet)") | |
report_lines.append(f"`{lemmatized_tokens}`\n") | |
# 5) POS Tagging | |
if "Tag parts of speech." in steps or "Extract named entities." in steps: | |
base_for_tagging = lemmatized_tokens if lemmatized_tokens is not None else (tokens or []) | |
pos_tags_val = pos_tag(base_for_tagging) | |
if "Tag parts of speech." in steps: | |
report_lines.append("### Part-of-Speech Tags") | |
rows = ["| Token | POS |", "|---|---|"] | |
rows += [f"| {t} | {p} |" for (t, p) in pos_tags_val] | |
report_lines.append("\n".join(rows) + "\n") | |
# 6) NER | |
if "Extract named entities." in steps: | |
if not pos_tags_val: | |
base_for_tagging = lemmatized_tokens if lemmatized_tokens is not None else (tokens or []) | |
pos_tags_val = pos_tag(base_for_tagging) | |
ne_tree = ne_chunk(pos_tags_val, binary=False) | |
ner_pairs = extract_ner(ne_tree) | |
report_lines.append("### Named Entities") | |
if ner_pairs: | |
rows = ["| Entity | Label |", "|---|---|"] | |
rows += [f"| {ent} | {lbl} |" for (ent, lbl) in ner_pairs] | |
report_lines.append("\n".join(rows) + "\n") | |
else: | |
report_lines.append("_No named entities found._\n") | |
return "\n".join(report_lines).strip() or "No steps selected." | |
# --- Gradio UI --------------------------------------------------------------- | |
MENU = [ | |
"Install and download required resources.", | |
"Tokenize text.", | |
"Remove stopwords.", | |
"Stem words.", | |
"Lemmatize words.", | |
"Tag parts of speech.", | |
"Extract named entities.", | |
] | |
DEFAULT_TEXT = ( | |
"NLTK is a powerful library for text processing. " | |
"Barack Obama served as the 44th President of the United States and lived in Washington, D.C." | |
) | |
with gr.Blocks(title="NLTK Text Processing Toolkit") as demo: | |
gr.Markdown("# NLTK Text Processing Toolkit") | |
gr.Markdown( | |
"Type or paste text, or drop a `.txt`/`.docx` file. " | |
"Select steps and click **Process**. Use **Install/Download Resources** if needed." | |
) | |
with gr.Row(): | |
with gr.Column(): | |
text_in = gr.Textbox( | |
label="Text Input", | |
lines=10, | |
value=DEFAULT_TEXT, | |
placeholder="Type or paste text here..." | |
) | |
file_in = gr.File( | |
label="...or drop a .txt / .docx file", | |
file_types=[".txt", ".docx"] | |
) | |
steps_in = gr.CheckboxGroup( | |
choices=MENU, | |
value=[ | |
"Tokenize text.", | |
"Remove stopwords.", | |
"Lemmatize words.", | |
"Tag parts of speech.", | |
"Extract named entities.", | |
], | |
label="Menu (choose one or more)" | |
) | |
with gr.Row(): | |
install_btn = gr.Button("Install/Download Resources") | |
process_btn = gr.Button("Process", variant="primary") | |
with gr.Column(): | |
status_out = gr.Textbox(label="Status / Logs", interactive=False) | |
result_out = gr.Markdown(label="Results") | |
# Button callbacks | |
def on_install(): | |
try: | |
return ensure_nltk_resources() | |
except Exception as e: | |
return f"Install error: {e}" | |
def on_process(text, file, steps): | |
try: | |
# Prefer typed text unless it's empty; otherwise use file | |
text = (text or "").strip() | |
file_text = read_file(file) if file is not None else "" | |
if not text and file_text: | |
text = file_text | |
if file_text.startswith("ERROR:") or file_text.startswith("Unsupported file type:"): | |
return file_text | |
return process_text(text, steps or []) | |
except Exception as e: | |
# Surface Python exceptions to the UI so it never looks like “nothing happened” | |
import traceback | |
return "### Error\n```\n" + "".join(traceback.format_exc()) + "\n```" | |
install_btn.click(fn=on_install, inputs=None, outputs=status_out) | |
process_btn.click(fn=on_process, inputs=[text_in, file_in, steps_in], outputs=result_out) | |
# Optional: pre-download on load so first click never fails silently | |
demo.load(lambda: ensure_nltk_resources(), inputs=None, outputs=status_out) | |
if __name__ == "__main__": | |
demo.launch() | |