Spaces:
Build error
Build error
| import streamlit as st | |
| import os | |
| import json | |
| import requests | |
| import pdfplumber | |
| import chromadb | |
| import re | |
| from langchain.document_loaders import PDFPlumberLoader | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_experimental.text_splitter import SemanticChunker | |
| from langchain_chroma import Chroma | |
| from langchain.chains import LLMChain | |
| from langchain.prompts import PromptTemplate | |
| from langchain_groq import ChatGroq | |
| from prompts import rag_prompt, relevancy_prompt, relevant_context_picker_prompt, response_synth | |
| # ----------------- Streamlit UI Setup ----------------- | |
| st.set_page_config(page_title="Blah-1", layout="centered") | |
| # ----------------- API Keys ----------------- | |
| os.environ["GROQ_API_KEY"] = st.secrets.get("GROQ_API_KEY", "") | |
| # Load LLM models | |
| llm_judge = ChatGroq(model="deepseek-r1-distill-llama-70b") | |
| rag_llm = ChatGroq(model="mixtral-8x7b-32768") | |
| llm_judge.verbose = True | |
| rag_llm.verbose = True | |
| # Clear ChromaDB cache to fix tenant issue | |
| chromadb.api.client.SharedSystemClient.clear_system_cache() | |
| # ----------------- ChromaDB Persistent Directory ----------------- | |
| CHROMA_DB_DIR = "/mnt/data/chroma_db" | |
| os.makedirs(CHROMA_DB_DIR, exist_ok=True) | |
| # ----------------- Initialize Session State ----------------- | |
| if "pdf_loaded" not in st.session_state: | |
| st.session_state.pdf_loaded = False | |
| if "chunked" not in st.session_state: | |
| st.session_state.chunked = False | |
| if "vector_created" not in st.session_state: | |
| st.session_state.vector_created = False | |
| if "processed_chunks" not in st.session_state: | |
| st.session_state.processed_chunks = None | |
| if "vector_store" not in st.session_state: | |
| st.session_state.vector_store = None | |
| # ----------------- Text Cleaning Functions ----------------- | |
| def clean_extracted_text(text): | |
| """ | |
| Cleans extracted PDF text by removing excessive line breaks, fixing spacing issues, and resolving OCR artifacts. | |
| """ | |
| text = re.sub(r'\n+', '\n', text) # Remove excessive newlines | |
| text = re.sub(r'\s{2,}', ' ', text) # Remove extra spaces | |
| text = re.sub(r'(\w)-\n(\w)', r'\1\2', text) # Fix hyphenated words split by a newline | |
| return text.strip() | |
| def extract_title_manually(text): | |
| """ | |
| Attempts to find the title by checking the first few lines. | |
| - Titles are usually long enough (more than 5 words). | |
| - Ignores common header text like "Abstract", "Introduction". | |
| """ | |
| lines = text.split("\n") | |
| ignore_keywords = ["abstract", "introduction", "keywords", "contents", "table", "figure"] | |
| for line in lines[:5]: # Check only the first 5 lines | |
| clean_line = line.strip() | |
| if len(clean_line.split()) > 5 and not any(word.lower() in clean_line.lower() for word in ignore_keywords): | |
| return clean_line # Return first valid title | |
| return "Unknown" | |
| # ----------------- Metadata Extraction ----------------- | |
| # ----------------- Metadata Extraction ----------------- | |
| def extract_metadata(pdf_path): | |
| """Extracts metadata using simple heuristics without LLM.""" | |
| with pdfplumber.open(pdf_path) as pdf: | |
| if not pdf.pages: | |
| return { | |
| "Title": "Unknown", | |
| "Author": "Unknown", | |
| "Emails": "No emails found", | |
| "Affiliations": "No affiliations found" | |
| } | |
| # Extract text from the first page | |
| first_page_text = pdf.pages[0].extract_text() or "No text found." | |
| cleaned_text = clean_extracted_text(first_page_text) | |
| # Extract Title | |
| pre_extracted_title = extract_title_manually(cleaned_text) | |
| # Extract Authors (Names typically appear before affiliations) | |
| author_pattern = re.compile(r"([\w\-\s]+,\s?)+[\w\-\s]+") | |
| authors = "Unknown" | |
| for line in cleaned_text.split("\n"): | |
| match = author_pattern.search(line) | |
| if match: | |
| authors = match.group(0) | |
| break | |
| # Extract Emails | |
| email_pattern = re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}") | |
| emails = ", ".join(email_pattern.findall(cleaned_text)) or "No emails found" | |
| # Extract Affiliations (usually below author names) | |
| affiliations = "Unknown" | |
| for i, line in enumerate(cleaned_text.split("\n")): | |
| if "@" in line: # Email appears before affiliations | |
| affiliations = cleaned_text.split("\n")[i + 1] if i + 1 < len(cleaned_text.split("\n")) else "Unknown" | |
| break | |
| return { | |
| "Title": pre_extracted_title, | |
| "Author": authors, | |
| "Emails": emails, | |
| "Affiliations": affiliations | |
| } | |
| # ----------------- Step 1: Choose PDF Source ----------------- | |
| pdf_source = st.radio("Upload or provide a link to a PDF:", ["Upload a PDF file", "Enter a PDF URL"], index=0, horizontal=True) | |
| if pdf_source == "Upload a PDF file": | |
| uploaded_file = st.file_uploader("Upload your PDF file", type=["pdf"]) | |
| if uploaded_file: | |
| st.session_state.pdf_path = "/mnt/data/temp.pdf" | |
| with open(st.session_state.pdf_path, "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| st.session_state.pdf_loaded = False | |
| st.session_state.chunked = False | |
| st.session_state.vector_created = False | |
| elif pdf_source == "Enter a PDF URL": | |
| pdf_url = st.text_input("Enter PDF URL:") | |
| if pdf_url and not st.session_state.pdf_loaded: | |
| with st.spinner("π Downloading PDF..."): | |
| try: | |
| response = requests.get(pdf_url) | |
| if response.status_code == 200: | |
| st.session_state.pdf_path = "/mnt/data/temp.pdf" | |
| with open(st.session_state.pdf_path, "wb") as f: | |
| f.write(response.content) | |
| st.session_state.pdf_loaded = False | |
| st.session_state.chunked = False | |
| st.session_state.vector_created = False | |
| st.success("β PDF Downloaded Successfully!") | |
| else: | |
| st.error("β Failed to download PDF. Check the URL.") | |
| except Exception as e: | |
| st.error(f"Error downloading PDF: {e}") | |
| # ----------------- Process PDF ----------------- | |
| if not st.session_state.pdf_loaded and "pdf_path" in st.session_state: | |
| with st.spinner("π Processing document... Please wait."): | |
| loader = PDFPlumberLoader(st.session_state.pdf_path) | |
| docs = loader.load() | |
| st.json(docs[0].metadata) | |
| # Extract metadata | |
| metadata = extract_metadata(st.session_state.pdf_path) | |
| # Display extracted-metadata | |
| if isinstance(metadata, dict): | |
| st.subheader("π Extracted Document Metadata") | |
| st.write(f"**Title:** {metadata.get('Title', 'Unknown')}") | |
| st.write(f"**Author:** {metadata.get('Author', 'Unknown')}") | |
| st.write(f"**Emails:** {metadata.get('Emails', 'No emails found')}") | |
| st.write(f"**Affiliations:** {metadata.get('Affiliations', 'No affiliations found')}") | |
| else: | |
| st.error("Metadata extraction failed.") | |
| # Embedding Model | |
| model_name = "nomic-ai/modernbert-embed-base" | |
| embedding_model = HuggingFaceEmbeddings(model_name=model_name, model_kwargs={"device": "cpu"}, encode_kwargs={'normalize_embeddings': False}) | |
| # Convert metadata into a retrievable chunk | |
| metadata_doc = {"page_content": metadata, "metadata": {"source": "metadata"}} | |
| # Prevent unnecessary re-chunking | |
| if not st.session_state.chunked: | |
| text_splitter = SemanticChunker(embedding_model) | |
| document_chunks = text_splitter.split_documents(docs) | |
| document_chunks.insert(0, metadata_doc) # Insert metadata as a retrievable document | |
| st.session_state.processed_chunks = document_chunks | |
| st.session_state.chunked = True | |
| st.session_state.pdf_loaded = True | |
| st.success("β Document processed and chunked successfully!") | |
| # ----------------- Setup Vector Store ----------------- | |
| if not st.session_state.vector_created and st.session_state.processed_chunks: | |
| with st.spinner("π Initializing Vector Store..."): | |
| st.session_state.vector_store = Chroma( | |
| persist_directory=CHROMA_DB_DIR, # <-- Ensures persistence | |
| collection_name="deepseek_collection", | |
| collection_metadata={"hnsw:space": "cosine"}, | |
| embedding_function=embedding_model | |
| ) | |
| st.session_state.vector_store.add_documents(st.session_state.processed_chunks) | |
| st.session_state.vector_created = True | |
| st.success("β Vector store initialized successfully!") | |
| # ----------------- Query Input ----------------- | |
| query = st.text_input("π Ask a question about the document:") | |
| if query: | |
| with st.spinner("π Retrieving relevant context..."): | |
| retriever = st.session_state.vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 5}) | |
| retrieved_docs = retriever.invoke(query) | |
| context = [d.page_content for d in retrieved_docs] | |
| st.success("β Context retrieved successfully!") | |
| # ----------------- Run Individual Chains Explicitly ----------------- | |
| context_relevancy_chain = LLMChain(llm=llm_judge, prompt=PromptTemplate(input_variables=["retriever_query", "context"], template=relevancy_prompt), output_key="relevancy_response") | |
| relevant_context_chain = LLMChain(llm=llm_judge, prompt=PromptTemplate(input_variables=["relevancy_response"], template=relevant_context_picker_prompt), output_key="context_number") | |
| relevant_contexts_chain = LLMChain(llm=llm_judge, prompt=PromptTemplate(input_variables=["context_number", "context"], template=response_synth), output_key="relevant_contexts") | |
| response_chain = LLMChain(llm=rag_llm, prompt=PromptTemplate(input_variables=["query", "context"], template=rag_prompt), output_key="final_response") | |
| response_crisis = context_relevancy_chain.invoke({"context": context, "retriever_query": query}) | |
| relevant_response = relevant_context_chain.invoke({"relevancy_response": response_crisis["relevancy_response"]}) | |
| contexts = relevant_contexts_chain.invoke({"context_number": relevant_response["context_number"], "context": context}) | |
| final_response = response_chain.invoke({"query": query, "context": contexts["relevant_contexts"]}) | |
| # ----------------- Display All Outputs ----------------- | |
| st.markdown("### Context Relevancy Evaluation") | |
| st.json(response_crisis["relevancy_response"]) | |
| st.markdown("### Picked Relevant Contexts") | |
| st.json(relevant_response["context_number"]) | |
| st.markdown("### Extracted Relevant Contexts") | |
| st.json(contexts["relevant_contexts"]) | |
| st.subheader("context_relevancy_evaluation_chain Statement") | |
| st.json(final_response["relevancy_response"]) | |
| st.subheader("pick_relevant_context_chain Statement") | |
| st.json(final_response["context_number"]) | |
| st.subheader("relevant_contexts_chain Statement") | |
| st.json(final_response["relevant_contexts"]) | |
| st.subheader("RAG Response Statement") | |
| st.json(final_response["final_response"]) |