Spaces:
Sleeping
Sleeping
from dotenv import load_dotenv | |
import os | |
import streamlit as st | |
from PyPDF2 import PdfFileReader | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.vectorstores import FAISS | |
from langchain.chains.question_answering import load_qa_chain | |
from langchain.llms import OpenAI as LLMSOpenAI | |
from langchain.llms import AzureOpenAI | |
from langchain.callbacks import get_openai_callback | |
from langchain.chat_models import ChatOpenAI | |
from docx import Document | |
from openpyxl import load_workbook | |
import pdfplumber | |
def extract_text_from_pdf(pdf_file): | |
with pdfplumber.open(pdf_file) as pdf: | |
text = "" | |
for page in pdf.pages: | |
text += page.extract_text() | |
return text | |
def extract_text_from_docx(docx_file): | |
doc = Document(docx_file) | |
paragraphs = [paragraph.text for paragraph in doc.paragraphs] | |
return "\n".join(paragraphs) | |
def extract_text_from_excel(excel_file): | |
workbook = load_workbook(excel_file) | |
text = "" | |
for sheet in workbook.sheetnames: | |
worksheet = workbook[sheet] | |
for row in worksheet.iter_rows(): | |
for cell in row: | |
if cell.value: | |
text += str(cell.value) + "\n" | |
return text | |
def split_text_into_chunks(text): | |
text_splitter = CharacterTextSplitter( | |
separator="\n", | |
chunk_size=1000, | |
chunk_overlap=200, | |
length_function=len | |
) | |
return text_splitter.split_text(text) | |
def create_knowledge_base(chunks, api_key=None): | |
embeddings = OpenAIEmbeddings(openai_api_key=api_key) | |
knowledge_base = FAISS.from_texts(chunks, embeddings) | |
return knowledge_base | |
def answer_question(question, knowledge_base, model): | |
docs = knowledge_base.similarity_search(question) | |
llm = model(model_name="gpt-3.5-turbo", openai_api_key=st.session_state.api_key) | |
chain = load_qa_chain(llm, chain_type="stuff") | |
with get_openai_callback() as cb: | |
response = chain.run(input_documents=docs, question=question) | |
return response | |
def save_api_key(api_key): | |
st.session_state.api_key = api_key | |
def main(): | |
load_dotenv() | |
st.set_page_config(page_title="Ask Your PDF", layout="wide") | |
# Sidebar | |
st.sidebar.title("Settings") | |
# API Key input | |
st.sidebar.subheader("API Key") | |
api_key = st.sidebar.text_input("Insert your API Key", type="password") | |
st.sidebar.button("Save API Key", on_click=save_api_key, args=(api_key,)) | |
model_type = st.sidebar.selectbox("Select Language Model", ["OpenAI", "AzureOpenAI"]) | |
if model_type == "AzureOpenAI": | |
model = AzureOpenAI | |
else: | |
model = ChatOpenAI | |
chunk_size = st.sidebar.slider("Chunk Size", min_value=500, max_value=2000, value=1000, step=100) | |
chunk_overlap = st.sidebar.slider("Chunk Overlap", min_value=100, max_value=500, value=200, step=50) | |
show_content = st.sidebar.checkbox("Show Document Content") | |
show_answers = st.sidebar.checkbox("Show Previous Answers") | |
# Main content | |
st.title("Ask Your Document 💭") | |
file_format = st.selectbox("Select File Format", ["PDF", "docx", "xlsx"]) | |
document = st.file_uploader("Upload Document", type=[file_format.lower()]) | |
if not hasattr(st.session_state, "api_key") or not st.session_state.api_key: | |
st.warning("You need to insert your API Key first.") | |
elif document is not None: | |
if file_format == "PDF": | |
text = extract_text_from_pdf(document) | |
elif file_format == "docx": | |
text = extract_text_from_docx(document) | |
elif file_format == "xlsx": | |
text = extract_text_from_excel(document) | |
else: | |
text = "" | |
if show_content: | |
st.subheader("Document Text:") | |
st.text_area("Content", value=text, height=300) | |
chunks = split_text_into_chunks(text) | |
knowledge_base = create_knowledge_base(chunks, api_key=st.session_state.api_key) | |
user_question = st.text_input("Ask a question based on the document content:") | |
if user_question: | |
response = answer_question(user_question, knowledge_base, model) | |
st.subheader("Answer:") | |
st.write(response) | |
# Store and display previous answers | |
if "answers" not in st.session_state: | |
st.session_state.answers = [] | |
st.session_state.answers.append((user_question, response)) | |
if show_answers: | |
st.subheader("Previous Answers:") | |
for question, answer in st.session_state.answers: | |
st.write(f"Question: {question}") | |
st.write(f"Answer: {answer}") | |
st.write("------") | |
if __name__ == '__main__': | |
main() | |