import random import matplotlib.pyplot as plt import nltk from nltk.tokenize import word_tokenize, sent_tokenize from nltk.corpus import stopwords # from nltk.stem import WordNetLemmatizer # Not used, commented out from nltk.text import Text from nltk.probability import FreqDist from cleantext import clean # import textract # Replaced by PyPDF2 import PyPDF2 # Added for PDF parsing import urllib.request from io import BytesIO import sys import pandas as pd # import cv2 # Not used, commented out import re from wordcloud import WordCloud # , ImageColorGenerator # ImageColorGenerator not used, commented out from textblob import TextBlob from PIL import Image import os import gradio as gr from dotenv import load_dotenv import groq import json import traceback import numpy as np import unidecode import contractions from sklearn.feature_extraction.text import TfidfVectorizer # Load environment variables load_dotenv() # Download NLTK resources (Ensure this runs once or handle caching) # nltk.download(['stopwords', 'wordnet', 'words']) # nltk.download('punkt') # nltk.download('punkt_tab') # Initialize Groq client groq_api_key = os.getenv("GROQ_API_KEY") groq_client = groq.Groq(api_key=groq_api_key) if groq_api_key else None # Stopwords customization stop_words = set(stopwords.words('english')) stop_words.update({'ask', 'much', 'thank', 'etc.', 'e', 'We', 'In', 'ed', 'pa', 'This', 'also', 'A', 'fu', 'To', '5', 'ing', 'er', '2'}) # Ensure stop_words is a set # --- Parsing & Preprocessing Functions --- # --- Replaced textract with PyPDF2 --- def Parsing(parsed_text): """ Parses text from a PDF file using PyPDF2. """ try: # Get the file path from the Gradio UploadFile object if hasattr(parsed_text, 'name'): file_path = parsed_text.name else: # Fallback if it's somehow just a string path file_path = parsed_text # Use PyPDF2 to read the PDF text = "" with open(file_path, 'rb') as pdf_file: # Open in binary read mode pdf_reader = PyPDF2.PdfReader(pdf_file) for page_num in range(len(pdf_reader.pages)): page = pdf_reader.pages[page_num] text += page.extract_text() + "\n" # Add newline between pages # Clean the extracted text return clean(text) except FileNotFoundError: print(f"Error parsing PDF: File not found at path: {file_path}") return f"Error parsing PDF: File not found. Please check the file upload." except PyPDF2.errors.PdfReadError as pre: print(f"Error reading PDF: {pre}") return f"Error reading PDF: The file might be corrupted or password-protected." except Exception as e: print(f"Error parsing PDF: {e}") return f"Error parsing PDF: {e}" def clean_text(text): text = text.encode("ascii", errors="ignore").decode("ascii") text = unidecode.unidecode(text) text = contractions.fix(text) text = re.sub(r"\n", " ", text) text = re.sub(r"\t", " ", text) text = re.sub(r"/ ", " ", text) text = text.strip() text = re.sub(" +", " ", text).strip() text = [word for word in text.split() if word not in stop_words] return ' '.join(text) def Preprocess(textParty): text1Party = re.sub('[^A-Za-z0-9]+', ' ', textParty) pattern = re.compile(r'\b(' + r'|'.join(stopwords.words('english')) + r')\b\s*') text2Party = pattern.sub('', text1Party) return text2Party # --- Core Analysis Functions --- def generate_summary(text): if not groq_client: return "Summarization is not available. Please set up your GROQ_API_KEY in the .env file." # Adjusted truncation length for potentially better summary context if len(text) > 15000: text = text[:15000] try: completion = groq_client.chat.completions.create( model="llama3-8b-8192", # Or your preferred model messages=[ {"role": "system", "content": "You are a helpful assistant that summarizes political manifestos. Provide a concise, objective summary that captures the key policy proposals, themes, and promises in the manifesto."}, {"role": "user", "content": f"Please summarize the following political manifesto text in about 300-500 words, focusing on the main policy areas, promises, and themes:\n{text}"} ], temperature=0.3, max_tokens=800 ) return completion.choices[0].message.content except Exception as e: return f"Error generating summary: {str(e)}" # --- New LLM-based Search Function --- def get_contextual_search_result(target_word, tar_passage, groq_client_instance, max_context_length=8000): """ Uses the LLM to provide contextual information about the target word within the passage. """ if not target_word or target_word.strip() == "": return "Please enter a search term." if not groq_client_instance: return "Contextual search requires the LLM API. Please set up your GROQ_API_KEY." # Truncate passage if too long for the model/context window original_length = len(tar_passage) if original_length > max_context_length: tar_passage_truncated = tar_passage[:max_context_length] print(f"Warning: Passage truncated for LLM search context from {original_length} to {max_context_length} characters.") else: tar_passage_truncated = tar_passage # --- Improved Prompt --- prompt = f""" You are an expert political analyst. You have been given a section of a political manifesto and a specific search term. Your task is to extract and summarize all information related to the search term from the provided text. Focus on: 1. Specific policies, promises, or statements related to the term. 2. The context in which the term is used. 3. Any key details, figures, or commitments mentioned. Present your findings concisely. If the term is not relevant or not found in the provided text section, state that clearly. Search Term: {target_word} Manifesto Text Section: {tar_passage_truncated} Relevant Information: """ try: completion = groq_client_instance.chat.completions.create( model="llama3-8b-8192", # Use the same or a suitable model messages=[ {"role": "system", "content": "You are a helpful assistant skilled at analyzing political texts and extracting relevant information based on a search query. Provide clear, concise summaries."}, {"role": "user", "content": prompt} ], temperature=0.2, # Low temperature for more factual extraction max_tokens=1000 # Adjust based on expected output length ) result = completion.choices[0].message.content.strip() # Add a note if the input was truncated if original_length > max_context_length: result = f"(Note: Analysis based on the first {max_context_length} characters of the manifesto.)\n\n" + result return result if result else f"No specific context for '{target_word}' could be generated from the provided text section." except Exception as e: error_msg = f"Error during contextual search for '{target_word}': {str(e)}" print(error_msg) traceback.print_exc() return error_msg # Or return the error message directly def fDistance(text2Party): word_tokens_party = word_tokenize(text2Party) fdistance = FreqDist(word_tokens_party).most_common(10) mem = {x[0]: x[1] for x in fdistance} vectorizer = TfidfVectorizer(max_features=15, stop_words='english') try: tfidf_matrix = vectorizer.fit_transform(sent_tokenize(text2Party)) feature_names = vectorizer.get_feature_names_out() tfidf_scores = {} sentences = sent_tokenize(text2Party) for i, word in enumerate(feature_names): scores = [] for j in range(tfidf_matrix.shape[0]): # Iterate through sentences if i < tfidf_matrix.shape[1]: # Check if word index is valid for this sentence vector scores.append(tfidf_matrix[j, i]) if scores: tfidf_scores[word] = sum(scores) / len(scores) # Average TF-IDF score across sentences combined_scores = {} all_words = set(list(mem.keys()) + list(tfidf_scores.keys())) max_freq = max(mem.values()) if mem else 1 max_tfidf = max(tfidf_scores.values()) if tfidf_scores else 1 for word in all_words: freq_score = mem.get(word, 0) / max_freq tfidf_score = tfidf_scores.get(word, 0) / max_tfidf combined_scores[word] = (freq_score * 0.3) + (tfidf_score * 0.7) top_words = dict(sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)[:10]) return normalize(top_words) except ValueError as ve: # Handle case where TF-IDF fails (e.g., empty after processing) print(f"Warning: TF-IDF failed, using only frequency: {ve}") # Fallback to just normalized frequency if TF-IDF fails if mem: max_freq = max(mem.values()) return {k: v / max_freq for k, v in list(mem.items())[:10]} # Return top 10 freq, normalized else: return {} def normalize(d, target=1.0): raw = sum(d.values()) factor = target / raw if raw != 0 else 0 return {key: value * factor for key, value in d.items()} # --- Visualization Functions with Error Handling --- # --- Improved safe_plot to handle apply_aspect errors --- def safe_plot(func, *args, **kwargs): """Executes a plotting function and returns the image, handling errors.""" buf = None # Initialize buffer try: # Ensure a clean figure state fig = plt.figure() # Create a new figure explicitly func(*args, **kwargs) buf = BytesIO() # Try saving with bbox_inches, but catch potential apply_aspect error try: plt.savefig(buf, format='png', bbox_inches='tight') except AttributeError as ae: if "apply_aspect" in str(ae): print(f"Warning: bbox_inches='tight' failed ({ae}), saving without it.") buf.seek(0) # Reset buffer as it might be partially written buf = BytesIO() # Get a fresh buffer plt.savefig(buf, format='png') # Save without bbox_inches else: raise # Re-raise if it's a different AttributeError buf.seek(0) img = Image.open(buf) plt.close(fig) # Explicitly close the specific figure return img except Exception as e: print(f"Plotting error in safe_plot: {e}") if buf: buf.close() # Ensure buffer is closed on error if it was created traceback.print_exc() # Try to return a placeholder or None plt.close('all') # Aggressive close on error return None def fDistancePlot(text2Party): def plot_func(): tokens = word_tokenize(text2Party) if not tokens: plt.text(0.5, 0.5, "No data to plot", ha='center', va='center') return fdist = FreqDist(tokens) fdist.plot(15, title='Frequency Distribution') plt.xticks(rotation=45, ha='right') # Rotate x-axis labels if needed plt.tight_layout() return safe_plot(plot_func) # --- Updated DispersionPlot without passing 'ax' --- def DispersionPlot(textParty): """Generates the word dispersion plot.""" buf = None # Initialize buffer try: word_tokens_party = word_tokenize(textParty) if not word_tokens_party: print("Warning: No tokens found for dispersion plot.") return None moby = Text(word_tokens_party) fdistance = FreqDist(word_tokens_party) # Get top 5 words, handle potential IndexError if less than 5 unique words common_words = fdistance.most_common(6) if len(common_words) < 5: word_Lst = [word for word, _ in common_words] else: word_Lst = [common_words[x][0] for x in range(5)] if not word_Lst: print("Warning: No common words found for dispersion plot.") return None # --- Manage figure explicitly without passing 'ax' --- fig = plt.figure(figsize=(10, 5)) # Create figure explicitly plt.title('Dispersion Plot') # Call dispersion_plot without 'ax' argument moby.dispersion_plot(word_Lst) plt.tight_layout() buf = BytesIO() # Handle potential apply_aspect error for dispersion plot try: fig.savefig(buf, format='png', bbox_inches='tight') except AttributeError as ae: if "apply_aspect" in str(ae): print(f"Warning: bbox_inches='tight' failed for Dispersion Plot ({ae}), saving without it.") buf.seek(0) buf = BytesIO() # Get a fresh buffer fig.savefig(buf, format='png') else: raise # Re-raise if it's a different AttributeError buf.seek(0) img = Image.open(buf) plt.close(fig) # Close the specific figure created return img except Exception as e: print(f"Dispersion plot error: {e}") if buf: buf.close() # Ensure buffer is closed on error traceback.print_exc() plt.close('all') # Aggressive close on error return None # Return None on error # --- Updated word_cloud_generator with robust figure handling --- def word_cloud_generator(parsed_text_name, text_Party): """Generates the word cloud image.""" buf = None # Initialize buffer try: # Handle case where parsed_text_name might not have .name filename_lower = "" if hasattr(parsed_text_name, 'name') and parsed_text_name.name: filename_lower = parsed_text_name.name.lower() elif isinstance(parsed_text_name, str): filename_lower = parsed_text_name.lower() mask_path = None if 'bjp' in filename_lower: mask_path = 'bjpImg2.jpeg' elif 'congress' in filename_lower: mask_path = 'congress3.jpeg' elif 'aap' in filename_lower: mask_path = 'aapMain2.jpg' # Generate word cloud if text_Party.strip() == "": raise ValueError("Text for word cloud is empty") # Generate word cloud object if mask_path and os.path.exists(mask_path): orgImg = Image.open(mask_path) # Ensure mask is in the right format (e.g., uint8) if orgImg.mode != 'RGB': orgImg = orgImg.convert('RGB') mask = np.array(orgImg) wordcloud = WordCloud(max_words=3000, mask=mask, background_color='white', mode='RGBA').generate(text_Party) # Added mode='RGBA' else: wordcloud = WordCloud(max_words=2000, background_color='white', mode='RGBA').generate(text_Party) # --- Key Fix: Explicitly manage figure and axes for word cloud --- fig, ax = plt.subplots(figsize=(8, 6)) # Create new figure and axes ax.imshow(wordcloud, interpolation='bilinear') ax.axis("off") fig.tight_layout(pad=0) # Remove padding buf = BytesIO() # Handle potential apply_aspect error for word cloud too try: fig.savefig(buf, format='png', bbox_inches='tight', dpi=150, facecolor='white') # Added dpi and facecolor except AttributeError as ae: if "apply_aspect" in str(ae): print(f"Warning: bbox_inches='tight' failed for Word Cloud ({ae}), saving without it.") buf.seek(0) buf = BytesIO() fig.savefig(buf, format='png', dpi=150, facecolor='white') else: raise buf.seek(0) img = Image.open(buf) plt.close(fig) # Close the specific figure return img except Exception as e: print(f"Word cloud error: {e}") if buf: buf.close() # Ensure buffer is closed on error traceback.print_exc() plt.close('all') # Aggressive close on error return None # Return None on error # --- Main Analysis Function --- def analysis(Manifesto, Search): try: if Manifesto is None: return "No file uploaded", {}, None, None, None, None, None, "No file uploaded" if Search.strip() == "": Search = "government" raw_party = Parsing(Manifesto) # Uses PyPDF2 now if isinstance(raw_party, str) and raw_party.startswith("Error"): return raw_party, {}, None, None, None, None, None, "Parsing failed" text_Party = clean_text(raw_party) text_Party_processed = Preprocess(text_Party) # --- Perform Search FIRST using the ORIGINAL text for better context --- # Use the new LLM-based search function searChRes = get_contextual_search_result(Search, raw_party, groq_client) summary = generate_summary(raw_party) # Use raw_party for summary for more context? # --- Sentiment Analysis --- if not text_Party_processed.strip(): # Handle empty text after processing df_dummy = pd.DataFrame({'Polarity_Label': ['Neutral'], 'Subjectivity_Label': ['Low']}) polarity_val = 0.0 subjectivity_val = 0.0 else: polarity_val = TextBlob(text_Party_processed).sentiment.polarity subjectivity_val = TextBlob(text_Party_processed).sentiment.subjectivity polarity_label = 'Positive' if polarity_val > 0 else 'Negative' if polarity_val < 0 else 'Neutral' subjectivity_label = 'High' if subjectivity_val > 0.5 else 'Low' df_dummy = pd.DataFrame({'Polarity_Label': [polarity_label], 'Subjectivity_Label': [subjectivity_label]}) # --- Generate Plots with Safe Plotting --- # Pass the potentially empty text and handle inside plotting functions sentiment_plot = safe_plot(lambda: df_dummy['Polarity_Label'].value_counts().plot(kind='bar', color="#FF9F45", title='Sentiment Analysis')) subjectivity_plot = safe_plot(lambda: df_dummy['Subjectivity_Label'].value_counts().plot(kind='bar', color="#B667F1", title='Subjectivity Analysis')) freq_plot = fDistancePlot(text_Party_processed) dispersion_plot = DispersionPlot(text_Party_processed) # Uses updated version wordcloud = word_cloud_generator(Manifesto, text_Party_processed) # Pass Manifesto object itself, uses updated version fdist_Party = fDistance(text_Party_processed) # searChRes is now generated earlier using LLM return searChRes, fdist_Party, sentiment_plot, subjectivity_plot, wordcloud, freq_plot, dispersion_plot, summary except Exception as e: error_msg = f"Critical error in analysis function: {str(e)}" print(error_msg) traceback.print_exc() # Return error messages/images in the correct order return error_msg, {}, None, None, None, None, None, "Analysis failed" # --- Gradio Interface --- # Use Blocks for custom layout with gr.Blocks(title='Manifesto Analysis') as demo: gr.Markdown("# Manifesto Analysis") # Input Section with gr.Row(): with gr.Column(scale=1): # Adjust scale if needed file_input = gr.File(label="Upload Manifesto PDF", file_types=[".pdf"]) with gr.Column(scale=1): search_input = gr.Textbox(label="Search Term", placeholder="Enter a term to search in the manifesto") submit_btn = gr.Button("Analyze Manifesto", variant='primary') # Make button prominent # Output Section using Tabs with gr.Tabs(): # --- Summary Tab --- with gr.TabItem("Summary"): summary_output = gr.Textbox(label='AI-Generated Summary', lines=10, interactive=False) # --- Search Results Tab (uses LLM output now) --- with gr.TabItem("Search Results"): search_output = gr.Textbox(label='Context Based Search Results', lines=15, interactive=False, max_lines=20) # Increased lines/max_lines # --- Key Topics Tab --- with gr.TabItem("Key Topics"): topics_output = gr.Label(label="Most Relevant Topics (LLM Enhanced)", num_top_classes=10) # Show top 10 # --- Visualizations Tab --- with gr.TabItem("Visualizations"): # Use Rows and Columns for better arrangement with gr.Row(): # Row 1: Sentiment & Subjectivity with gr.Column(): sentiment_output = gr.Image(label='Sentiment Analysis', interactive=False, height=400) # Set height with gr.Column(): subjectivity_output = gr.Image(label='Subjectivity Analysis', interactive=False, height=400) with gr.Row(): # Row 2: Word Cloud & Frequency with gr.Column(): wordcloud_output = gr.Image(label='Word Cloud', interactive=False, height=400) with gr.Column(): freq_output = gr.Image(label='Frequency Distribution', interactive=False, height=400) with gr.Row(): # Row 3: Dispersion Plot (Full width) with gr.Column(): dispersion_output = gr.Image(label='Dispersion Plot', interactive=False, height=400) # Adjust height as needed # --- Link Button Click to Function and Outputs --- # Ensure the order of outputs matches the function return order submit_btn.click( fn=analysis, inputs=[file_input, search_input], outputs=[ search_output, # 1 (Now contextual LLM output) topics_output, # 2 sentiment_output, # 3 subjectivity_output, # 4 wordcloud_output, # 5 freq_output, # 6 dispersion_output, # 7 summary_output # 8 ], concurrency_limit=1 # Limit concurrent analyses if needed ) # --- Examples --- gr.Examples( examples=[ ["Example/AAP_Manifesto_2019.pdf", "government"], ["Example/Bjp_Manifesto_2019.pdf", "environment"], ["Example/Congress_Manifesto_2019.pdf", "safety"] ], inputs=[file_input, search_input], outputs=[search_output, topics_output, sentiment_output, subjectivity_output, wordcloud_output, freq_output, dispersion_output, summary_output], # Link examples to outputs fn=analysis # Run analysis on example click ) # Launch the app if __name__ == "__main__": demo.launch(debug=True, share=False, show_error=True)