|
import streamlit as st |
|
import os |
|
import zipfile |
|
from io import BytesIO |
|
from PyPDF2 import PdfReader |
|
from keybert import KeyBERT |
|
from sentence_transformers import SentenceTransformer, util |
|
|
|
|
|
kw_model = KeyBERT() |
|
semantic_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') |
|
|
|
def main(): |
|
st.title("PDF Topic Grouping App") |
|
|
|
st.warning(""" |
|
**Warning**: Do not enter confidential data into this app when it is running in the cloud. |
|
Your information may not be secure. |
|
""") |
|
|
|
st.warning(""" |
|
**Important**: This Space is shared with other users, meaning others can view your results and data. |
|
Please duplicate this Space to your own Hugging Face account for privacy and security. |
|
""") |
|
|
|
|
|
uploaded_files = st.file_uploader("Upload PDFs", type="pdf", accept_multiple_files=True) |
|
|
|
if not uploaded_files: |
|
st.info("Please upload PDFs to continue.") |
|
return |
|
|
|
|
|
uploaded_file_names = [f.name for f in uploaded_files] |
|
if "uploaded_files" not in st.session_state or st.session_state.uploaded_files != uploaded_file_names: |
|
st.session_state.uploaded_files = uploaded_file_names |
|
st.session_state.keywords_set = None |
|
|
|
|
|
if st.session_state.keywords_set is None: |
|
st.info("Extracting keywords from PDFs...") |
|
pdf_texts = {} |
|
keywords_set = set() |
|
|
|
progress1 = st.progress(0) |
|
total_files = len(uploaded_files) |
|
|
|
for i, uploaded_file in enumerate(uploaded_files): |
|
pdf_name = uploaded_file.name |
|
try: |
|
reader = PdfReader(uploaded_file) |
|
text = "".join(page.extract_text() for page in reader.pages) |
|
pdf_texts[pdf_name] = text.lower() |
|
|
|
extracted_keywords = kw_model.extract_keywords(text, top_n=5) |
|
for kw, _ in extracted_keywords: |
|
keywords_set.add(kw.lower()) |
|
except Exception as e: |
|
st.error(f"Failed to process {pdf_name}: {e}") |
|
finally: |
|
progress1.progress((i + 1) / total_files) |
|
|
|
if not pdf_texts: |
|
st.error("No PDFs could be processed.") |
|
return |
|
|
|
progress1.progress(1.0) |
|
st.session_state.pdf_texts = pdf_texts |
|
st.session_state.keywords_set = keywords_set |
|
|
|
|
|
selected_keywords = st.multiselect( |
|
"Select at least two keywords/topics for grouping:", |
|
list(st.session_state.keywords_set), |
|
default=list(st.session_state.keywords_set)[:2] |
|
) |
|
|
|
if st.button("Confirm Keyword Selection"): |
|
if len(selected_keywords) < 2: |
|
st.error("Please select at least two keywords to continue.") |
|
else: |
|
st.session_state.selected_keywords = selected_keywords |
|
st.session_state.keywords_confirmed = True |
|
else: |
|
st.session_state.keywords_confirmed = False |
|
|
|
if not st.session_state.get("keywords_confirmed", False): |
|
st.stop() |
|
|
|
st.success("Keyword selection confirmed. Processing PDFs...") |
|
|
|
|
|
st.info("Precomputing embeddings for PDFs...") |
|
progress2 = st.progress(0) |
|
pdf_embeddings = {} |
|
pdf_texts = st.session_state.pdf_texts |
|
total_pdfs = len(pdf_texts) |
|
|
|
for i, (pdf_name, text) in enumerate(pdf_texts.items()): |
|
try: |
|
pdf_embeddings[pdf_name] = semantic_model.encode(text, convert_to_tensor=True) |
|
except Exception as e: |
|
st.error(f"Failed to compute embedding for {pdf_name}: {e}") |
|
finally: |
|
progress2.progress((i + 1) / total_pdfs) |
|
|
|
progress2.progress(1.0) |
|
|
|
|
|
st.info("Precomputing embeddings for selected keywords...") |
|
progress3 = st.progress(0) |
|
selected_keywords = st.session_state.selected_keywords |
|
keyword_embeddings = {} |
|
total_keywords = len(selected_keywords) |
|
|
|
for i, keyword in enumerate(selected_keywords): |
|
try: |
|
keyword_embeddings[keyword] = semantic_model.encode(keyword, convert_to_tensor=True) |
|
except Exception as e: |
|
st.error(f"Failed to compute embedding for keyword '{keyword}': {e}") |
|
finally: |
|
progress3.progress((i + 1) / total_keywords) |
|
|
|
progress3.progress(1.0) |
|
|
|
|
|
st.info("Assigning PDFs to the most relevant topics...") |
|
pdf_groups = {keyword: [] for keyword in selected_keywords} |
|
|
|
for pdf_name, text_embedding in pdf_embeddings.items(): |
|
best_keyword = None |
|
max_similarity = -1 |
|
|
|
for keyword, keyword_embedding in keyword_embeddings.items(): |
|
similarity = util.pytorch_cos_sim(text_embedding, keyword_embedding).item() |
|
if similarity > max_similarity: |
|
max_similarity = similarity |
|
best_keyword = keyword |
|
|
|
if best_keyword: |
|
pdf_groups[best_keyword].append(pdf_name) |
|
|
|
|
|
output_folder = "grouped_pdfs" |
|
os.makedirs(output_folder, exist_ok=True) |
|
|
|
for keyword, pdf_names in pdf_groups.items(): |
|
keyword_folder = os.path.join(output_folder, keyword) |
|
os.makedirs(keyword_folder, exist_ok=True) |
|
for pdf_name in pdf_names: |
|
matched_file = next((f for f in uploaded_files if f.name == pdf_name), None) |
|
if matched_file: |
|
with open(os.path.join(keyword_folder, pdf_name), "wb") as f: |
|
f.write(matched_file.getvalue()) |
|
|
|
|
|
zip_buffer = BytesIO() |
|
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: |
|
for root, _, files in os.walk(output_folder): |
|
for file in files: |
|
file_path = os.path.join(root, file) |
|
zip_file.write(file_path, os.path.relpath(file_path, output_folder)) |
|
zip_buffer.seek(0) |
|
|
|
|
|
for root, dirs, files in os.walk(output_folder, topdown=False): |
|
for file in files: |
|
os.remove(os.path.join(root, file)) |
|
for dir in dirs: |
|
os.rmdir(os.path.join(root, dir)) |
|
os.rmdir(output_folder) |
|
|
|
|
|
st.success("PDFs processed and grouped successfully!") |
|
st.download_button( |
|
label="Download Grouped PDFs", |
|
data=zip_buffer, |
|
file_name="grouped_pdfs.zip", |
|
mime="application/zip" |
|
) |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|