|
|
|
|
|
|
|
|
|
import os |
|
os.system("python -m spacy download en_core_web_sm") |
|
import io |
|
import base64 |
|
import streamlit as st |
|
import numpy as np |
|
import fitz |
|
import tempfile |
|
from ultralytics import YOLO |
|
from sklearn.cluster import KMeans |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
from langchain_core.output_parsers import StrOutputParser |
|
from langchain_community.document_loaders import PyMuPDFLoader |
|
from langchain_openai import OpenAIEmbeddings |
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
from langchain_text_splitters import SpacyTextSplitter |
|
from langchain_core.prompts import ChatPromptTemplate |
|
from langchain_openai import ChatOpenAI |
|
import re |
|
from PIL import Image |
|
from streamlit_chat import message |
|
|
|
|
|
|
|
model = YOLO("best.pt") |
|
openai_api_key = os.environ.get("openai_api_key") |
|
|
|
|
|
figure_class_index = 4 |
|
table_class_index = 3 |
|
|
|
|
|
def clean_text(text): |
|
return re.sub(r'\s+', ' ', text).strip() |
|
|
|
def remove_references(text): |
|
reference_patterns = [ |
|
r'\bReferences\b', r'\breferences\b', r'\bBibliography\b', r'\bCitations\b', |
|
r'\bWorks Cited\b', r'\bReference\b', r'\breference\b' |
|
] |
|
lines = text.split('\n') |
|
for i, line in enumerate(lines): |
|
if any(re.search(pattern, line, re.IGNORECASE) for pattern in reference_patterns): |
|
return '\n'.join(lines[:i]) |
|
return text |
|
|
|
def save_uploaded_file(uploaded_file): |
|
temp_file = tempfile.NamedTemporaryFile(delete=False) |
|
temp_file.write(uploaded_file.getbuffer()) |
|
temp_file.close() |
|
return temp_file.name |
|
|
|
def summarize_pdf(pdf_file_path, num_clusters=10): |
|
embeddings_model = OpenAIEmbeddings(model="text-embedding-3-small", api_key=openai_api_key) |
|
llm = ChatOpenAI(model="gpt-4o-mini", api_key=openai_api_key, temperature=0.3) |
|
prompt = ChatPromptTemplate.from_template( |
|
"""Could you please provide a concise and comprehensive summary of the given Contexts? |
|
The summary should capture the main points and key details of the text while conveying the author's intended meaning accurately. |
|
Please ensure that the summary is well-organized and easy to read, with clear headings and subheadings to guide the reader through each section. |
|
The length of the summary should be appropriate to capture the main points and key details of the text, without including unnecessary information or becoming overly long. |
|
example of summary: |
|
## Summary: |
|
## Key points: |
|
Contexts: {topic}""" |
|
) |
|
output_parser = StrOutputParser() |
|
chain = prompt | llm | output_parser |
|
|
|
loader = PyMuPDFLoader(pdf_file_path) |
|
docs = loader.load() |
|
full_text = "\n".join(doc.page_content for doc in docs) |
|
cleaned_full_text = clean_text(remove_references(full_text)) |
|
text_splitter = SpacyTextSplitter(chunk_size=500) |
|
|
|
split_contents = text_splitter.split_text(cleaned_full_text) |
|
embeddings = embeddings_model.embed_documents(split_contents) |
|
|
|
kmeans = KMeans(n_clusters=num_clusters, init='k-means++', random_state=0).fit(embeddings) |
|
closest_point_indices = [np.argmin(np.linalg.norm(embeddings - center, axis=1)) for center in kmeans.cluster_centers_] |
|
extracted_contents = [split_contents[idx] for idx in closest_point_indices] |
|
|
|
results = chain.invoke({"topic": ' '.join(extracted_contents)}) |
|
|
|
return generate_citations(results, extracted_contents) |
|
|
|
def qa_pdf(pdf_file_path, query, num_clusters=5, similarity_threshold=0.6): |
|
embeddings_model = OpenAIEmbeddings(model="text-embedding-3-small", api_key=openai_api_key) |
|
llm = ChatOpenAI(model="gpt-4o-mini", api_key=openai_api_key, temperature=0.3) |
|
prompt = ChatPromptTemplate.from_template( |
|
"""Please provide a detailed and accurate answer to the given question based on the provided contexts. |
|
Ensure that the answer is comprehensive and directly addresses the query. |
|
If necessary, include relevant examples or details from the text. |
|
Question: {question} |
|
Contexts: {contexts}""" |
|
) |
|
output_parser = StrOutputParser() |
|
chain = prompt | llm | output_parser |
|
|
|
loader = PyMuPDFLoader(pdf_file_path) |
|
docs = loader.load() |
|
full_text = "\n".join(doc.page_content for doc in docs) |
|
cleaned_full_text = clean_text(remove_references(full_text)) |
|
text_splitter = SpacyTextSplitter(chunk_size=500) |
|
|
|
|
|
split_contents = text_splitter.split_text(cleaned_full_text) |
|
embeddings = embeddings_model.embed_documents(split_contents) |
|
|
|
query_embedding = embeddings_model.embed_query(query) |
|
similarity_scores = cosine_similarity([query_embedding], embeddings)[0] |
|
top_indices = np.argsort(similarity_scores)[-num_clusters:] |
|
relevant_contents = [split_contents[i] for i in top_indices] |
|
|
|
results = chain.invoke({"question": query, "contexts": ' '.join(relevant_contents)}) |
|
|
|
return generate_citations(results, relevant_contents, similarity_threshold) |
|
|
|
def generate_citations(text, contents, similarity_threshold=0.6): |
|
embeddings_model = OpenAIEmbeddings(model="text-embedding-3-small", api_key=openai_api_key) |
|
text_sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text) |
|
text_embeddings = embeddings_model.embed_documents(text_sentences) |
|
content_embeddings = embeddings_model.embed_documents(contents) |
|
similarity_matrix = cosine_similarity(text_embeddings, content_embeddings) |
|
|
|
cited_text = text |
|
relevant_sources = [] |
|
source_mapping = {} |
|
sentence_to_source = {} |
|
|
|
for i, sentence in enumerate(text_sentences): |
|
if sentence in sentence_to_source: |
|
continue |
|
max_similarity = max(similarity_matrix[i]) |
|
if max_similarity >= similarity_threshold: |
|
most_similar_idx = np.argmax(similarity_matrix[i]) |
|
if most_similar_idx not in source_mapping: |
|
source_mapping[most_similar_idx] = len(relevant_sources) + 1 |
|
relevant_sources.append((most_similar_idx, contents[most_similar_idx])) |
|
citation_idx = source_mapping[most_similar_idx] |
|
citation = f"([Source {citation_idx}](#source-{citation_idx}))" |
|
cited_sentence = re.sub(r'([.!?])$', f" {citation}\\1", sentence) |
|
sentence_to_source[sentence] = citation_idx |
|
cited_text = cited_text.replace(sentence, cited_sentence) |
|
|
|
sources_list = "\n\n## Sources:\n" |
|
for idx, (original_idx, content) in enumerate(relevant_sources): |
|
sources_list += f""" |
|
<details style="margin: 1px 0; padding: 5px; border: 1px solid #ccc; border-radius: 8px; background-color: #f9f9f9; transition: all 0.3s ease;"> |
|
<summary style="font-weight: bold; cursor: pointer; outline: none; padding: 5px 0; transition: color 0.3s ease;">Source {idx + 1}</summary> |
|
<pre style="white-space: pre-wrap; word-wrap: break-word; margin: 1px 0; padding: 10px; background-color: #fff; border-radius: 5px; border: 1px solid #ddd; box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1);">{content}</pre> |
|
</details> |
|
""" |
|
|
|
|
|
dummy_blanks = """ |
|
<div style="margin: 20px 0;"></div> |
|
<div style="margin: 20px 0;"></div> |
|
<div style="margin: 20px 0;"></div> |
|
<div style="margin: 20px 0;"></div> |
|
<div style="margin: 20px 0;"></div> |
|
|
|
""" |
|
|
|
cited_text += sources_list + dummy_blanks |
|
return cited_text |
|
|
|
def infer_image_and_get_boxes(image, confidence_threshold=0.8): |
|
results = model.predict(image) |
|
return [ |
|
(int(box.xyxy[0][0]), int(box.xyxy[0][1]), int(box.xyxy[0][2]), int(box.xyxy[0][3]), int(box.cls[0])) |
|
for result in results for box in result.boxes |
|
if int(box.cls[0]) in {figure_class_index, table_class_index} and box.conf[0] > confidence_threshold |
|
] |
|
|
|
def crop_images_from_boxes(image, boxes, scale_factor): |
|
figures = [] |
|
tables = [] |
|
for (x1, y1, x2, y2, cls) in boxes: |
|
cropped_img = image[int(y1 * scale_factor):int(y2 * scale_factor), int(x1 * scale_factor):int(x2 * scale_factor)] |
|
if cls == figure_class_index: |
|
figures.append(cropped_img) |
|
elif cls == table_class_index: |
|
tables.append(cropped_img) |
|
return figures, tables |
|
|
|
def process_pdf(pdf_file_path): |
|
doc = fitz.open(pdf_file_path) |
|
all_figures = [] |
|
all_tables = [] |
|
low_dpi = 50 |
|
high_dpi = 300 |
|
scale_factor = high_dpi / low_dpi |
|
low_res_pixmaps = [page.get_pixmap(dpi=low_dpi) for page in doc] |
|
|
|
for page_num, low_res_pix in enumerate(low_res_pixmaps): |
|
low_res_img = np.frombuffer(low_res_pix.samples, dtype=np.uint8).reshape(low_res_pix.height, low_res_pix.width, 3) |
|
boxes = infer_image_and_get_boxes(low_res_img) |
|
|
|
if boxes: |
|
high_res_pix = doc[page_num].get_pixmap(dpi=high_dpi) |
|
high_res_img = np.frombuffer(high_res_pix.samples, dtype=np.uint8).reshape(high_res_pix.height, high_res_pix.width, 3) |
|
figures, tables = crop_images_from_boxes(high_res_img, boxes, scale_factor) |
|
all_figures.extend(figures) |
|
all_tables.extend(tables) |
|
|
|
return all_figures, all_tables |
|
|
|
def image_to_base64(img): |
|
buffered = io.BytesIO() |
|
img = Image.fromarray(img) |
|
img.save(buffered, format="PNG") |
|
return base64.b64encode(buffered.getvalue()).decode() |
|
|
|
def on_btn_click(): |
|
del st.session_state.chat_history[:] |
|
|
|
|
|
|
|
|
|
uploadercss=''' |
|
<style> |
|
[data-testid='stFileUploader'] { |
|
width: max-content; |
|
} |
|
[data-testid='stFileUploader'] section { |
|
padding: 0; |
|
float: left; |
|
} |
|
[data-testid='stFileUploader'] section > input + div { |
|
display: none; |
|
} |
|
[data-testid='stFileUploader'] section + div { |
|
float: right; |
|
padding-top: 0; |
|
} |
|
|
|
</style> |
|
''' |
|
|
|
st.set_page_config(page_title="PDF Reading Assistant", page_icon="π") |
|
|
|
|
|
if 'chat_history' not in st.session_state: |
|
st.session_state.chat_history = [] |
|
|
|
st.title("π PDF Reading Assistant") |
|
st.markdown("### Extract tables, figures, summaries, and answers from your PDF files easily.") |
|
chat_placeholder = st.empty() |
|
|
|
|
|
uploaded_file = st.file_uploader("Upload a PDF", type="pdf") |
|
st.markdown(uploadercss, unsafe_allow_html=True) |
|
if uploaded_file: |
|
file_path = save_uploaded_file(uploaded_file) |
|
|
|
|
|
chat_container = st.container() |
|
user_input = st.chat_input("Ask a question about the pdf......", key="user_input") |
|
with chat_container: |
|
|
|
for idx, chat in enumerate(st.session_state.chat_history): |
|
if chat.get("user"): |
|
message(chat["user"], is_user=True, allow_html=True, key=f"user_{idx}", avatar_style="initials", seed="user") |
|
if chat.get("bot"): |
|
message(chat["bot"], is_user=False, allow_html=True, key=f"bot_{idx}",seed="bot") |
|
|
|
|
|
with st.form(key="chat_form", clear_on_submit=True,border=False): |
|
|
|
col1, col2, col3 = st.columns([1, 1, 1]) |
|
with col1: |
|
summary_button = st.form_submit_button("Generate Summary") |
|
with col2: |
|
extract_button = st.form_submit_button("Extract Tables and Figures") |
|
with col3: |
|
st.form_submit_button("Clear message", on_click=on_btn_click) |
|
|
|
|
|
if summary_button: |
|
with st.spinner("Generating summary..."): |
|
summary = summarize_pdf(file_path) |
|
st.session_state.chat_history.append({"user": "Generate Summary", "bot": summary}) |
|
st.rerun() |
|
|
|
if extract_button: |
|
with st.spinner("Extracting tables and figures..."): |
|
figures, tables = process_pdf(file_path) |
|
if figures: |
|
st.session_state.chat_history.append({"user": "Figures"}) |
|
|
|
for idx, figure in enumerate(figures): |
|
figure_base64 = image_to_base64(figure) |
|
result_html = f'<img src="data:image/png;base64,{figure_base64}" style="width:100%; display:block;" alt="Figure {idx+1}"/>' |
|
st.session_state.chat_history.append({"bot": f"Figure {idx+1} {result_html}"}) |
|
if tables: |
|
st.session_state.chat_history.append({"user": "Tables"}) |
|
for idx, table in enumerate(tables): |
|
table_base64 = image_to_base64(table) |
|
result_html = f'<img src="data:image/png;base64,{table_base64}" style="width:100%; display:block;" alt="Table {idx+1}"/>' |
|
st.session_state.chat_history.append({"bot": f"Table {idx+1} {result_html}"}) |
|
st.rerun() |
|
|
|
if user_input: |
|
st.session_state.chat_history.append({"user": user_input, "bot": None}) |
|
with st.spinner("Processing..."): |
|
answer = qa_pdf(file_path, user_input) |
|
st.session_state.chat_history[-1]["bot"] = answer |
|
st.rerun() |
|
|
|
|
|
st.markdown(""" |
|
<style> |
|
#chat-container { |
|
max-height: 500px; |
|
overflow-y: auto; |
|
padding: 1rem; |
|
border: 1px solid #ddd; |
|
border-radius: 8px; |
|
background-color: #fefefe; |
|
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1); |
|
transition: background-color 0.3s ease; |
|
} |
|
#chat-container:hover { |
|
background-color: #f9f9f9; |
|
} |
|
.stChatMessage { |
|
padding: 0.75rem; |
|
margin: 0.75rem 0; |
|
border-radius: 8px; |
|
box-shadow: 0 1px 3px rgba(0, 0, 0, 0.1); |
|
transition: background-color 0.3s ease; |
|
} |
|
.stChatMessage--user { |
|
background-color: #E3F2FD; |
|
} |
|
.stChatMessage--user:hover { |
|
background-color: #BBDEFB; |
|
} |
|
.stChatMessage--bot { |
|
background-color: #EDE7F6; |
|
} |
|
.stChatMessage--bot:hover { |
|
background-color: #D1C4E9; |
|
} |
|
textarea { |
|
width: 100%; |
|
padding: 1rem; |
|
border: 1px solid #ddd; |
|
border-radius: 8px; |
|
box-shadow: inset 0 1px 3px rgba(0, 0, 0, 0.1); |
|
transition: border-color 0.3s ease, box-shadow 0.3s ease; |
|
} |
|
textarea:focus { |
|
border-color: #4CAF50; |
|
box-shadow: 0 0 5px rgba(76, 175, 80, 0.5); |
|
} |
|
.stButton > button { |
|
width: 100%; |
|
background-color: #4CAF50; |
|
color: white; |
|
border: none; |
|
border-radius: 8px; |
|
padding: 0.75rem; |
|
font-size: 16px; |
|
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1); |
|
transition: background-color 0.3s ease, box-shadow 0.3s ease; |
|
} |
|
.stButton > button:hover { |
|
background-color: #45A049; |
|
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1); |
|
} |
|
</style> |
|
<script> |
|
const chatContainer = document.getElementById('chat-container'); |
|
chatContainer.scrollTop = chatContainer.scrollHeight; |
|
</script> |
|
""", unsafe_allow_html=True) |
|
|