Sa-m's picture
Update app.py
140eb89 verified
raw
history blame
25.6 kB
import random
import matplotlib.pyplot as plt
import nltk
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.corpus import stopwords
# from nltk.stem import WordNetLemmatizer # Not used, commented out
from nltk.text import Text
from nltk.probability import FreqDist
from cleantext import clean
# import textract # Replaced by PyPDF2
import PyPDF2 # Added for PDF parsing
import urllib.request
from io import BytesIO
import sys
import pandas as pd
# import cv2 # Not used, commented out
import re
from wordcloud import WordCloud # , ImageColorGenerator # ImageColorGenerator not used, commented out
from textblob import TextBlob
from PIL import Image
import os
import gradio as gr
from dotenv import load_dotenv
import groq
import json
import traceback
import numpy as np
import unidecode
import contractions
from sklearn.feature_extraction.text import TfidfVectorizer
# Load environment variables
load_dotenv()
# Inside your main script (e.g., near the top after imports)
import nltk
import ssl # Sometimes needed for NLTK downloads
def ensure_nltk_resources():
try:
# Try to find a resource to see if download is needed
# Using punkt as an example; you might check others too
nltk.data.find('tokenizers/punkt')
nltk.data.find('corpora/stopwords')
# Add checks for wordnet, words, punkt_tab as needed
except LookupError:
print("NLTK resources not found. Downloading...")
try:
# Handle potential SSL issues (common on some systems)
_create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
pass
else:
ssl._create_default_https_context = _create_unverified_https_context
nltk.download(['stopwords', 'wordnet', 'words'])
nltk.download('punkt')
nltk.download('punkt_tab')
print("NLTK resources downloaded successfully.")
# Call the function at the start of your script
ensure_nltk_resources()
# Initialize Groq client
groq_api_key = os.getenv("GROQ_API_KEY")
groq_client = groq.Groq(api_key=groq_api_key) if groq_api_key else None
# Stopwords customization
stop_words = set(stopwords.words('english'))
stop_words.update({'ask', 'much', 'thank', 'etc.', 'e', 'We', 'In', 'ed', 'pa', 'This', 'also', 'A', 'fu', 'To', '5', 'ing', 'er', '2'}) # Ensure stop_words is a set
# --- Parsing & Preprocessing Functions ---
# --- Replaced textract with PyPDF2 ---
def Parsing(parsed_text):
"""
Parses text from a PDF file using PyPDF2.
"""
try:
# Get the file path from the Gradio UploadFile object
if hasattr(parsed_text, 'name'):
file_path = parsed_text.name
else:
# Fallback if it's somehow just a string path
file_path = parsed_text
# Use PyPDF2 to read the PDF
text = ""
with open(file_path, 'rb') as pdf_file: # Open in binary read mode
pdf_reader = PyPDF2.PdfReader(pdf_file)
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
text += page.extract_text() + "\n" # Add newline between pages
# Clean the extracted text
return clean(text)
except FileNotFoundError:
print(f"Error parsing PDF: File not found at path: {file_path}")
return f"Error parsing PDF: File not found. Please check the file upload."
except PyPDF2.errors.PdfReadError as pre:
print(f"Error reading PDF: {pre}")
return f"Error reading PDF: The file might be corrupted or password-protected."
except Exception as e:
print(f"Error parsing PDF: {e}")
return f"Error parsing PDF: {e}"
def clean_text(text):
text = text.encode("ascii", errors="ignore").decode("ascii")
text = unidecode.unidecode(text)
text = contractions.fix(text)
text = re.sub(r"\n", " ", text)
text = re.sub(r"\t", " ", text)
text = re.sub(r"/ ", " ", text)
text = text.strip()
text = re.sub(" +", " ", text).strip()
text = [word for word in text.split() if word not in stop_words]
return ' '.join(text)
def Preprocess(textParty):
text1Party = re.sub('[^A-Za-z0-9]+', ' ', textParty)
pattern = re.compile(r'\b(' + r'|'.join(stopwords.words('english')) + r')\b\s*')
text2Party = pattern.sub('', text1Party)
return text2Party
# --- Core Analysis Functions ---
def generate_summary(text):
if not groq_client:
return "Summarization is not available. Please set up your GROQ_API_KEY in the .env file."
# Adjusted truncation length for potentially better summary context
if len(text) > 15000:
text = text[:15000]
try:
completion = groq_client.chat.completions.create(
model="llama3-8b-8192", # Or your preferred model
messages=[
{"role": "system", "content": "You are a helpful assistant that summarizes political manifestos. Provide a concise, objective summary that captures the key policy proposals, themes, and promises in the manifesto."},
{"role": "user", "content": f"Please summarize the following political manifesto text in about 300-500 words, focusing on the main policy areas, promises, and themes:\n{text}"}
],
temperature=0.3,
max_tokens=800
)
return completion.choices[0].message.content
except Exception as e:
return f"Error generating summary: {str(e)}"
# --- New LLM-based Search Function ---
def get_contextual_search_result(target_word, tar_passage, groq_client_instance, max_context_length=8000):
"""
Uses the LLM to provide contextual information about the target word within the passage.
"""
if not target_word or target_word.strip() == "":
return "Please enter a search term."
if not groq_client_instance:
return "Contextual search requires the LLM API. Please set up your GROQ_API_KEY."
# Truncate passage if too long for the model/context window
original_length = len(tar_passage)
if original_length > max_context_length:
tar_passage_truncated = tar_passage[:max_context_length]
print(f"Warning: Passage truncated for LLM search context from {original_length} to {max_context_length} characters.")
else:
tar_passage_truncated = tar_passage
# --- Improved Prompt ---
prompt = f"""
You are an expert political analyst. You have been given a section of a political manifesto and a specific search term.
Your task is to extract and summarize all information related to the search term from the provided text.
Focus on:
1. Specific policies, promises, or statements related to the term.
2. The context in which the term is used.
3. Any key details, figures, or commitments mentioned.
Present your findings concisely. If the term is not relevant or not found in the provided text section, state that clearly.
Search Term: {target_word}
Manifesto Text Section:
{tar_passage_truncated}
Relevant Information:
"""
try:
completion = groq_client_instance.chat.completions.create(
model="llama3-8b-8192", # Use the same or a suitable model
messages=[
{"role": "system", "content": "You are a helpful assistant skilled at analyzing political texts and extracting relevant information based on a search query. Provide clear, concise summaries."},
{"role": "user", "content": prompt}
],
temperature=0.2, # Low temperature for more factual extraction
max_tokens=1000 # Adjust based on expected output length
)
result = completion.choices[0].message.content.strip()
# Add a note if the input was truncated
if original_length > max_context_length:
result = f"(Note: Analysis based on the first {max_context_length} characters of the manifesto.)\n\n" + result
return result if result else f"No specific context for '{target_word}' could be generated from the provided text section."
except Exception as e:
error_msg = f"Error during contextual search for '{target_word}': {str(e)}"
print(error_msg)
traceback.print_exc()
return error_msg # Or return the error message directly
def fDistance(text2Party):
word_tokens_party = word_tokenize(text2Party)
fdistance = FreqDist(word_tokens_party).most_common(10)
mem = {x[0]: x[1] for x in fdistance}
vectorizer = TfidfVectorizer(max_features=15, stop_words='english')
try:
tfidf_matrix = vectorizer.fit_transform(sent_tokenize(text2Party))
feature_names = vectorizer.get_feature_names_out()
tfidf_scores = {}
sentences = sent_tokenize(text2Party)
for i, word in enumerate(feature_names):
scores = []
for j in range(tfidf_matrix.shape[0]): # Iterate through sentences
if i < tfidf_matrix.shape[1]: # Check if word index is valid for this sentence vector
scores.append(tfidf_matrix[j, i])
if scores:
tfidf_scores[word] = sum(scores) / len(scores) # Average TF-IDF score across sentences
combined_scores = {}
all_words = set(list(mem.keys()) + list(tfidf_scores.keys()))
max_freq = max(mem.values()) if mem else 1
max_tfidf = max(tfidf_scores.values()) if tfidf_scores else 1
for word in all_words:
freq_score = mem.get(word, 0) / max_freq
tfidf_score = tfidf_scores.get(word, 0) / max_tfidf
combined_scores[word] = (freq_score * 0.3) + (tfidf_score * 0.7)
top_words = dict(sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)[:10])
return normalize(top_words)
except ValueError as ve: # Handle case where TF-IDF fails (e.g., empty after processing)
print(f"Warning: TF-IDF failed, using only frequency: {ve}")
# Fallback to just normalized frequency if TF-IDF fails
if mem:
max_freq = max(mem.values())
return {k: v / max_freq for k, v in list(mem.items())[:10]} # Return top 10 freq, normalized
else:
return {}
def normalize(d, target=1.0):
raw = sum(d.values())
factor = target / raw if raw != 0 else 0
return {key: value * factor for key, value in d.items()}
# --- Visualization Functions with Error Handling ---
# --- Improved safe_plot to handle apply_aspect errors ---
def safe_plot(func, *args, **kwargs):
"""Executes a plotting function and returns the image, handling errors."""
buf = None # Initialize buffer
try:
# Ensure a clean figure state
fig = plt.figure() # Create a new figure explicitly
func(*args, **kwargs)
buf = BytesIO()
# Try saving with bbox_inches, but catch potential apply_aspect error
try:
plt.savefig(buf, format='png', bbox_inches='tight')
except AttributeError as ae:
if "apply_aspect" in str(ae):
print(f"Warning: bbox_inches='tight' failed ({ae}), saving without it.")
buf.seek(0) # Reset buffer as it might be partially written
buf = BytesIO() # Get a fresh buffer
plt.savefig(buf, format='png') # Save without bbox_inches
else:
raise # Re-raise if it's a different AttributeError
buf.seek(0)
img = Image.open(buf)
plt.close(fig) # Explicitly close the specific figure
return img
except Exception as e:
print(f"Plotting error in safe_plot: {e}")
if buf:
buf.close() # Ensure buffer is closed on error if it was created
traceback.print_exc()
# Try to return a placeholder or None
plt.close('all') # Aggressive close on error
return None
def fDistancePlot(text2Party):
def plot_func():
tokens = word_tokenize(text2Party)
if not tokens:
plt.text(0.5, 0.5, "No data to plot", ha='center', va='center')
return
fdist = FreqDist(tokens)
fdist.plot(15, title='Frequency Distribution')
plt.xticks(rotation=45, ha='right') # Rotate x-axis labels if needed
plt.tight_layout()
return safe_plot(plot_func)
def DispersionPlot(textParty):
"""
Generates a dispersion plot using Matplotlib.
Shows the positions of the most common words along the text.
"""
buf = None
try:
word_tokens_party = word_tokenize(textParty.lower()) # Lowercase for matching
print(f"Debug DispersionPlot: Total tokens: {len(word_tokens_party)}")
if not word_tokens_party:
print("Warning: No tokens found for dispersion plot.")
return None
fdistance = FreqDist(word_tokens_party)
print(f"Debug DispersionPlot: FreqDist sample: {list(fdistance.most_common(10))}")
# --- Improved word selection logic ---
common_words_raw = fdistance.most_common(15)
# Filter words: length > 2, alphabetic, not just digits
common_words_filtered = [
(word, freq) for word, freq in common_words_raw
if len(word) > 2 and word.isalpha() and not word.isdigit()
]
print(f"Debug DispersionPlot: Filtered common words: {common_words_filtered}")
# Select top 5 from filtered list
final_word_list = [word for word, _ in common_words_filtered[:5]]
print(f"Debug DispersionPlot: Final word list for plot: {final_word_list}")
if not final_word_list:
print("Warning: No suitable words found for dispersion plot.")
# Create a simple plot indicating no data
fig, ax = plt.subplots(figsize=(8, 3))
ax.text(0.5, 0.5, "No suitable words found for dispersion plot", ha='center', va='center', transform=ax.transAxes)
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
fig.suptitle('Dispersion Plot')
else:
# --- Create the dispersion plot manually ---
fig, ax = plt.subplots(figsize=(12, 6))
# X-axis: position in the text (token index)
x = list(range(len(word_tokens_party)))
# Y-axis: will be offset for each word for visualization
# We'll plot a scatter point for each occurrence of the target words
colors = plt.cm.get_cmap('tab10', len(final_word_list))
for i, word in enumerate(final_word_list):
# Find all indices where the word occurs
offsets = [j for j, token in enumerate(word_tokens_party) if token == word]
y_positions = [i + 1] * len(offsets) # Offset y-position for each word
ax.scatter(offsets, y_positions, label=word, color=colors(i), alpha=0.7, s=30) # s is marker size
ax.set_xlabel("Position in Text (Token Index)")
ax.set_ylabel("Words")
ax.set_title("Dispersion Plot")
# Set y-ticks to correspond to the words
ax.set_yticks(range(1, len(final_word_list) + 1))
ax.set_yticklabels(final_word_list)
# Invert y-axis so the first word in the list is at the top
ax.invert_yaxis()
# Add grid for better readability
ax.grid(True, axis='x', linestyle='--', alpha=0.5)
# Add legend
# ax.legend(title="Words", bbox_to_anchor=(1.05, 1), loc='upper left') # Place legend outside plot
# Or, include legend inside if space allows and it's not too cluttered
# For simplicity inside the plot area (adjust if needed)
# ax.legend(title="Words")
plt.tight_layout()
buf = BytesIO()
# Handle potential apply_aspect error
try:
fig.savefig(buf, format='png', bbox_inches='tight', dpi=150) # Added dpi for clarity
except AttributeError as ae:
if "apply_aspect" in str(ae):
print(f"Warning: bbox_inches='tight' failed for Dispersion Plot ({ae}), saving without it.")
buf.seek(0)
buf = BytesIO()
fig.savefig(buf, format='png', dpi=150)
else:
raise
buf.seek(0)
img = Image.open(buf)
plt.close(fig)
return img
except Exception as e:
print(f"Dispersion plot error: {e}")
if buf:
buf.close()
traceback.print_exc()
plt.close('all')
return None
def word_cloud_generator(parsed_text_name, text_Party):
"""Generates the word cloud image."""
buf = None # Initialize buffer
try:
# Handle case where parsed_text_name might not have .name
filename_lower = ""
if hasattr(parsed_text_name, 'name') and parsed_text_name.name:
filename_lower = parsed_text_name.name.lower()
elif isinstance(parsed_text_name, str):
filename_lower = parsed_text_name.lower()
mask_path = None
if 'bjp' in filename_lower:
mask_path = 'bjpImg2.jpeg'
elif 'congress' in filename_lower:
mask_path = 'congress3.jpeg'
elif 'aap' in filename_lower:
mask_path = 'aapMain2.jpg'
if text_Party.strip() == "":
raise ValueError("Text for word cloud is empty")
# Generate word cloud object
if mask_path and os.path.exists(mask_path):
orgImg = Image.open(mask_path)
if orgImg.mode != 'RGB':
orgImg = orgImg.convert('RGB')
mask = np.array(orgImg)
wordcloud = WordCloud(max_words=3000, mask=mask, background_color='white', mode='RGBA').generate(text_Party) # Added mode='RGBA'
else:
wordcloud = WordCloud(max_words=2000, background_color='white', mode='RGBA').generate(text_Party)
# --- Key Fix: Explicitly manage figure and axes for word cloud ---
fig, ax = plt.subplots(figsize=(8, 6)) # Create new figure and axes
ax.imshow(wordcloud, interpolation='bilinear')
ax.axis("off")
fig.tight_layout(pad=0) # Remove padding
buf = BytesIO()
# Handle potential apply_aspect error for word cloud too
try:
fig.savefig(buf, format='png', bbox_inches='tight', dpi=150, facecolor='white') # Added dpi and facecolor
except AttributeError as ae:
if "apply_aspect" in str(ae):
print(f"Warning: bbox_inches='tight' failed for Word Cloud ({ae}), saving without it.")
buf.seek(0)
buf = BytesIO()
fig.savefig(buf, format='png', dpi=150, facecolor='white')
else:
raise
buf.seek(0)
img = Image.open(buf)
plt.close(fig) # Close the specific figure
return img
except Exception as e:
print(f"Word cloud error: {e}")
if buf:
buf.close() # Ensure buffer is closed on error
traceback.print_exc()
plt.close('all') # Aggressive close on error
return None # Return None on error
# --- Main Analysis Function ---
def analysis(Manifesto, Search):
try:
if Manifesto is None:
return "No file uploaded", {}, None, None, None, None, None, "No file uploaded"
if Search.strip() == "":
Search = "government"
raw_party = Parsing(Manifesto) # Uses PyPDF2 now
if isinstance(raw_party, str) and raw_party.startswith("Error"):
return raw_party, {}, None, None, None, None, None, "Parsing failed"
text_Party = clean_text(raw_party)
text_Party_processed = Preprocess(text_Party)
# --- Perform Search FIRST using the ORIGINAL text for better context ---
# Use the new LLM-based search function
searChRes = get_contextual_search_result(Search, raw_party, groq_client)
summary = generate_summary(raw_party) # Use raw_party for summary for more context?
# --- Sentiment Analysis ---
if not text_Party_processed.strip():
# Handle empty text after processing
df_dummy = pd.DataFrame({'Polarity_Label': ['Neutral'], 'Subjectivity_Label': ['Low']})
polarity_val = 0.0
subjectivity_val = 0.0
else:
polarity_val = TextBlob(text_Party_processed).sentiment.polarity
subjectivity_val = TextBlob(text_Party_processed).sentiment.subjectivity
polarity_label = 'Positive' if polarity_val > 0 else 'Negative' if polarity_val < 0 else 'Neutral'
subjectivity_label = 'High' if subjectivity_val > 0.5 else 'Low'
df_dummy = pd.DataFrame({'Polarity_Label': [polarity_label], 'Subjectivity_Label': [subjectivity_label]})
# --- Generate Plots with Safe Plotting ---
# Pass the potentially empty text and handle inside plotting functions
sentiment_plot = safe_plot(lambda: df_dummy['Polarity_Label'].value_counts().plot(kind='bar', color="#FF9F45", title='Sentiment Analysis'))
subjectivity_plot = safe_plot(lambda: df_dummy['Subjectivity_Label'].value_counts().plot(kind='bar', color="#B667F1", title='Subjectivity Analysis'))
freq_plot = fDistancePlot(text_Party_processed)
dispersion_plot = DispersionPlot(text_Party_processed) # Uses updated version
wordcloud = word_cloud_generator(Manifesto, text_Party_processed) # Pass Manifesto object itself, uses updated version
fdist_Party = fDistance(text_Party_processed)
# searChRes is now generated earlier using LLM
return searChRes, fdist_Party, sentiment_plot, subjectivity_plot, wordcloud, freq_plot, dispersion_plot, summary
except Exception as e:
error_msg = f"Critical error in analysis function: {str(e)}"
print(error_msg)
traceback.print_exc()
# Return error messages/images in the correct order
return error_msg, {}, None, None, None, None, None, "Analysis failed"
# --- Gradio Interface ---
# Use Blocks for custom layout
with gr.Blocks(title='Manifesto Analysis') as demo:
gr.Markdown("# Manifesto Analysis")
# Input Section
with gr.Row():
with gr.Column(scale=1): # Adjust scale if needed
file_input = gr.File(label="Upload Manifesto PDF", file_types=[".pdf"])
with gr.Column(scale=1):
search_input = gr.Textbox(label="Search Term", placeholder="Enter a term to search in the manifesto")
submit_btn = gr.Button("Analyze Manifesto", variant='primary') # Make button prominent
# Output Section using Tabs
with gr.Tabs():
# --- Summary Tab ---
with gr.TabItem("Summary"):
summary_output = gr.Textbox(label='AI-Generated Summary', lines=10, interactive=False)
# --- Search Results Tab (uses LLM output now) ---
with gr.TabItem("Search Results"):
search_output = gr.Textbox(label='Context Based Search Results', lines=15, interactive=False, max_lines=20) # Increased lines/max_lines
# --- Key Topics Tab ---
with gr.TabItem("Key Topics"):
topics_output = gr.Label(label="Most Relevant Topics (LLM Enhanced)", num_top_classes=10) # Show top 10
# --- Visualizations Tab ---
with gr.TabItem("Visualizations"):
# Use Rows and Columns for better arrangement
with gr.Row(): # Row 1: Sentiment & Subjectivity
with gr.Column():
sentiment_output = gr.Image(label='Sentiment Analysis', interactive=False, height=400) # Set height
with gr.Column():
subjectivity_output = gr.Image(label='Subjectivity Analysis', interactive=False, height=400)
with gr.Row(): # Row 2: Word Cloud & Frequency
with gr.Column():
wordcloud_output = gr.Image(label='Word Cloud', interactive=False, height=400)
with gr.Column():
freq_output = gr.Image(label='Frequency Distribution', interactive=False, height=400)
with gr.Row(): # Row 3: Dispersion Plot (Full width)
with gr.Column():
dispersion_output = gr.Image(label='Dispersion Plot', interactive=False, height=400) # Adjust height as needed
# --- Link Button Click to Function and Outputs ---
submit_btn.click(
fn=analysis,
inputs=[file_input, search_input],
outputs=[
search_output, # 1 (Now contextual LLM output)
topics_output, # 2
sentiment_output, # 3
subjectivity_output, # 4
wordcloud_output, # 5
freq_output, # 6
dispersion_output, # 7
summary_output # 8
],
concurrency_limit=1 # Limit concurrent analyses if needed
)
# --- Examples ---
gr.Examples(
examples=[
["Example/AAP_Manifesto_2019.pdf", "government"],
["Example/Bjp_Manifesto_2019.pdf", "environment"],
["Example/Congress_Manifesto_2019.pdf", "safety"]
],
inputs=[file_input, search_input],
outputs=[search_output, topics_output, sentiment_output, subjectivity_output, wordcloud_output, freq_output, dispersion_output, summary_output], # Link examples to outputs
fn=analysis # Run analysis on example click
)
if __name__ == "__main__":
demo.launch(debug=True, share=False, show_error=True)