import streamlit as st import os import zipfile from io import BytesIO from PyPDF2 import PdfReader from keybert import KeyBERT from sentence_transformers import SentenceTransformer, util # Initialize KeyBERT and Sentence Transformer kw_model = KeyBERT() semantic_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') def main(): st.title("PDF Topic Grouping App") st.warning(""" **Warning**: Do not enter confidential data into this app when it is running in the cloud. Your information may not be secure. """) st.warning(""" **Important**: This Space is shared with other users, meaning others can view your results and data. Please duplicate this Space to your own Hugging Face account for privacy and security. """) # Step 1: Upload PDFs uploaded_files = st.file_uploader("Upload PDFs", type="pdf", accept_multiple_files=True) if not uploaded_files: st.info("Please upload PDFs to continue.") return # Check if uploaded files have changed uploaded_file_names = [f.name for f in uploaded_files] if "uploaded_files" not in st.session_state or st.session_state.uploaded_files != uploaded_file_names: st.session_state.uploaded_files = uploaded_file_names st.session_state.keywords_set = None # Extract text and keywords from PDFs if not already done if st.session_state.keywords_set is None: st.info("Extracting keywords from PDFs...") pdf_texts = {} keywords_set = set() progress1 = st.progress(0) total_files = len(uploaded_files) for i, uploaded_file in enumerate(uploaded_files): pdf_name = uploaded_file.name try: reader = PdfReader(uploaded_file) text = "".join(page.extract_text() for page in reader.pages) pdf_texts[pdf_name] = text.lower() extracted_keywords = kw_model.extract_keywords(text, top_n=5) for kw, _ in extracted_keywords: keywords_set.add(kw.lower()) except Exception as e: st.error(f"Failed to process {pdf_name}: {e}") finally: progress1.progress((i + 1) / total_files) if not pdf_texts: st.error("No PDFs could be processed.") return progress1.progress(1.0) st.session_state.pdf_texts = pdf_texts st.session_state.keywords_set = keywords_set # Display extracted keywords and let the user select topics selected_keywords = st.multiselect( "Select at least two keywords/topics for grouping:", list(st.session_state.keywords_set), default=list(st.session_state.keywords_set)[:2] ) if st.button("Confirm Keyword Selection"): if len(selected_keywords) < 2: st.error("Please select at least two keywords to continue.") else: st.session_state.selected_keywords = selected_keywords st.session_state.keywords_confirmed = True else: st.session_state.keywords_confirmed = False if not st.session_state.get("keywords_confirmed", False): st.stop() st.success("Keyword selection confirmed. Processing PDFs...") # Precompute embeddings for PDFs st.info("Precomputing embeddings for PDFs...") progress2 = st.progress(0) pdf_embeddings = {} pdf_texts = st.session_state.pdf_texts total_pdfs = len(pdf_texts) for i, (pdf_name, text) in enumerate(pdf_texts.items()): try: pdf_embeddings[pdf_name] = semantic_model.encode(text, convert_to_tensor=True) except Exception as e: st.error(f"Failed to compute embedding for {pdf_name}: {e}") finally: progress2.progress((i + 1) / total_pdfs) progress2.progress(1.0) # Precompute embeddings for selected keywords st.info("Precomputing embeddings for selected keywords...") progress3 = st.progress(0) selected_keywords = st.session_state.selected_keywords keyword_embeddings = {} total_keywords = len(selected_keywords) for i, keyword in enumerate(selected_keywords): try: keyword_embeddings[keyword] = semantic_model.encode(keyword, convert_to_tensor=True) except Exception as e: st.error(f"Failed to compute embedding for keyword '{keyword}': {e}") finally: progress3.progress((i + 1) / total_keywords) progress3.progress(1.0) # Group PDFs by the most relevant topic st.info("Assigning PDFs to the most relevant topics...") pdf_groups = {keyword: [] for keyword in selected_keywords} for pdf_name, text_embedding in pdf_embeddings.items(): best_keyword = None max_similarity = -1 for keyword, keyword_embedding in keyword_embeddings.items(): similarity = util.pytorch_cos_sim(text_embedding, keyword_embedding).item() if similarity > max_similarity: max_similarity = similarity best_keyword = keyword if best_keyword: pdf_groups[best_keyword].append(pdf_name) # Save grouped PDFs into folders output_folder = "grouped_pdfs" os.makedirs(output_folder, exist_ok=True) for keyword, pdf_names in pdf_groups.items(): keyword_folder = os.path.join(output_folder, keyword) os.makedirs(keyword_folder, exist_ok=True) for pdf_name in pdf_names: matched_file = next((f for f in uploaded_files if f.name == pdf_name), None) if matched_file: with open(os.path.join(keyword_folder, pdf_name), "wb") as f: f.write(matched_file.getvalue()) # Zip the folders zip_buffer = BytesIO() with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: for root, _, files in os.walk(output_folder): for file in files: file_path = os.path.join(root, file) zip_file.write(file_path, os.path.relpath(file_path, output_folder)) zip_buffer.seek(0) # Clean up temporary folders for root, dirs, files in os.walk(output_folder, topdown=False): for file in files: os.remove(os.path.join(root, file)) for dir in dirs: os.rmdir(os.path.join(root, dir)) os.rmdir(output_folder) # Step 4: Download zipped file st.success("PDFs processed and grouped successfully!") st.download_button( label="Download Grouped PDFs", data=zip_buffer, file_name="grouped_pdfs.zip", mime="application/zip" ) if __name__ == "__main__": main()