|
import os |
|
import json |
|
import re |
|
import gradio as gr |
|
import requests |
|
from duckduckgo_search import DDGS |
|
from typing import List |
|
from pydantic import BaseModel, Field |
|
from tempfile import NamedTemporaryFile |
|
from langchain_community.vectorstores import FAISS |
|
from langchain_community.document_loaders import PyPDFLoader |
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
from llama_parse import LlamaParse |
|
from langchain_core.documents import Document |
|
from huggingface_hub import InferenceClient |
|
import inspect |
|
|
|
|
|
huggingface_token = os.environ.get("HUGGINGFACE_TOKEN") |
|
llama_cloud_api_key = os.environ.get("LLAMA_CLOUD_API_KEY") |
|
CLOUDFLARE_ACCOUNT_ID = os.environ.get("CLOUDFLARE_ACCOUNT_ID") |
|
CLOUDFLARE_AUTH_TOKEN = os.environ.get("CLOUDFLARE_AUTH_TOKEN") |
|
|
|
MODELS = [ |
|
"microsoft/Phi-3-mini-4k-instruct", |
|
"mistralai/Mistral-7B-Instruct-v0.3", |
|
"mistralai/Mixtral-8x7B-Instruct-v0.1", |
|
"cloudflare/llama-3.1-8b-instruct" |
|
] |
|
|
|
|
|
llama_parser = LlamaParse( |
|
api_key=llama_cloud_api_key, |
|
result_type="markdown", |
|
num_workers=4, |
|
verbose=True, |
|
language="en", |
|
) |
|
|
|
def load_document(file: NamedTemporaryFile, parser: str = "llamaparse") -> List[Document]: |
|
"""Loads and splits the document into pages.""" |
|
if parser == "pypdf": |
|
loader = PyPDFLoader(file.name) |
|
return loader.load_and_split() |
|
elif parser == "llamaparse": |
|
try: |
|
documents = llama_parser.load_data(file.name) |
|
return [Document(page_content=doc.text, metadata={"source": file.name}) for doc in documents] |
|
except Exception as e: |
|
print(f"Error using Llama Parse: {str(e)}") |
|
print("Falling back to PyPDF parser") |
|
loader = PyPDFLoader(file.name) |
|
return loader.load_and_split() |
|
else: |
|
raise ValueError("Invalid parser specified. Use 'pypdf' or 'llamaparse'.") |
|
|
|
def get_embeddings(): |
|
return HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") |
|
|
|
def update_vectors(files, parser): |
|
if not files: |
|
return "Please upload at least one PDF file." |
|
|
|
embed = get_embeddings() |
|
total_chunks = 0 |
|
|
|
all_data = [] |
|
for file in files: |
|
data = load_document(file, parser) |
|
all_data.extend(data) |
|
total_chunks += len(data) |
|
|
|
if os.path.exists("faiss_database"): |
|
database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) |
|
database.add_documents(all_data) |
|
else: |
|
database = FAISS.from_documents(all_data, embed) |
|
|
|
database.save_local("faiss_database") |
|
|
|
return f"Vector store updated successfully. Processed {total_chunks} chunks from {len(files)} files using {parser}." |
|
|
|
def generate_chunked_response(prompt, model, max_tokens=1000, max_chunks=5, temperature=0.2): |
|
if model == "cloudflare/llama-3.1-8b-instruct": |
|
return generate_cloudflare_response(prompt, max_tokens, temperature) |
|
|
|
client = InferenceClient( |
|
model, |
|
token=huggingface_token, |
|
) |
|
|
|
full_response = "" |
|
messages = [{"role": "user", "content": prompt}] |
|
|
|
try: |
|
for message in client.chat_completion( |
|
messages=messages, |
|
max_tokens=max_tokens, |
|
temperature=temperature, |
|
stream=True, |
|
): |
|
chunk = message.choices[0].delta.content |
|
if chunk: |
|
full_response += chunk |
|
|
|
except Exception as e: |
|
print(f"Error in generating response: {str(e)}") |
|
|
|
|
|
clean_response = re.sub(r'<s>\[INST\].*?\[/INST\]\s*', '', full_response, flags=re.DOTALL) |
|
clean_response = clean_response.replace("Using the following context:", "").strip() |
|
clean_response = clean_response.replace("Using the following context from the PDF documents:", "").strip() |
|
|
|
return clean_response |
|
|
|
def generate_cloudflare_response(prompt, max_tokens, temperature): |
|
try: |
|
response = requests.post( |
|
f"https://api.cloudflare.com/client/v4/accounts/{CLOUDFLARE_ACCOUNT_ID}/ai/run/@cf/meta/llama-3.1-8b-instruct", |
|
headers={"Authorization": f"Bearer {CLOUDFLARE_AUTH_TOKEN}"}, |
|
json={ |
|
"messages": [ |
|
{"role": "system", "content": "You are a friendly assistant"}, |
|
{"role": "user", "content": prompt} |
|
], |
|
"max_tokens": max_tokens, |
|
"temperature": temperature |
|
} |
|
) |
|
|
|
|
|
response.raise_for_status() |
|
|
|
result = response.json() |
|
if not result: |
|
raise ValueError("Empty response from Cloudflare API") |
|
|
|
if 'result' not in result: |
|
raise ValueError(f"Unexpected response format. 'result' key missing. Response: {result}") |
|
|
|
if 'response' not in result['result']: |
|
raise ValueError(f"Unexpected response format. 'response' key missing. Result: {result['result']}") |
|
|
|
return result['result']['response'] |
|
|
|
except requests.exceptions.RequestException as e: |
|
error_message = f"Network error when calling Cloudflare API: {str(e)}" |
|
print(error_message) |
|
return f"Error: {error_message}" |
|
except json.JSONDecodeError as e: |
|
error_message = f"Error decoding JSON response from Cloudflare API: {str(e)}" |
|
print(error_message) |
|
return f"Error: {error_message}" |
|
except ValueError as e: |
|
error_message = str(e) |
|
print(error_message) |
|
return f"Error: {error_message}" |
|
except Exception as e: |
|
error_message = f"Unexpected error in generate_cloudflare_response: {str(e)}" |
|
print(error_message) |
|
return f"Error: {error_message}" |
|
|
|
|
|
def duckduckgo_search(query): |
|
with DDGS() as ddgs: |
|
results = ddgs.text(query, max_results=5) |
|
return results |
|
|
|
class CitingSources(BaseModel): |
|
sources: List[str] = Field( |
|
..., |
|
description="List of sources to cite. Should be an URL of the source." |
|
) |
|
|
|
def get_response_from_pdf(query, model, temperature=0.2): |
|
embed = get_embeddings() |
|
if os.path.exists("faiss_database"): |
|
database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) |
|
else: |
|
return "No documents available. Please upload PDF documents to answer questions." |
|
|
|
retriever = database.as_retriever() |
|
relevant_docs = retriever.get_relevant_documents(query) |
|
context_str = "\n".join([doc.page_content for doc in relevant_docs]) |
|
|
|
prompt = f"""<s>[INST] Using the following context from the PDF documents: |
|
{context_str} |
|
Write a detailed and complete response that answers the following user question: '{query}' |
|
Do not include a list of sources in your response. [/INST]""" |
|
|
|
generated_text = generate_chunked_response(prompt, model, temperature=temperature) |
|
|
|
|
|
clean_text = re.sub(r'<s>\[INST\].*?\[/INST\]\s*', '', generated_text, flags=re.DOTALL) |
|
clean_text = clean_text.replace("Using the following context from the PDF documents:", "").strip() |
|
|
|
return clean_text |
|
|
|
def get_response_with_search(query, model, temperature=0.2): |
|
search_results = duckduckgo_search(query) |
|
context = "\n".join(f"{result['title']}\n{result['body']}\nSource: {result['href']}\n" |
|
for result in search_results if 'body' in result) |
|
|
|
prompt = f"""<s>[INST] Using the following context: |
|
{context} |
|
Write a detailed and complete research document that fulfills the following user request: '{query}' |
|
After writing the document, please provide a list of sources used in your response. [/INST]""" |
|
|
|
generated_text = generate_chunked_response(prompt, model, temperature=temperature) |
|
|
|
|
|
clean_text = re.sub(r'<s>\[INST\].*?\[/INST\]\s*', '', generated_text, flags=re.DOTALL) |
|
clean_text = clean_text.replace("Using the following context:", "").strip() |
|
|
|
|
|
parts = clean_text.split("Sources:", 1) |
|
main_content = parts[0].strip() |
|
sources = parts[1].strip() if len(parts) > 1 else "" |
|
|
|
return main_content, sources |
|
|
|
def chatbot_interface(message, history, use_web_search, model, temperature): |
|
if not message.strip(): |
|
return history |
|
|
|
if use_web_search: |
|
main_content, sources = get_response_with_search(message, model, temperature) |
|
formatted_response = f"{main_content}\n\nSources:\n{sources}" |
|
else: |
|
response = get_response_from_pdf(message, model, temperature) |
|
formatted_response = response |
|
|
|
|
|
if history and history[-1][0] == message: |
|
|
|
history[-1] = (message, formatted_response) |
|
else: |
|
|
|
history.append((message, formatted_response)) |
|
|
|
return history |
|
|
|
|
|
def clear_and_update_chat(message, history, use_web_search, model, temperature): |
|
updated_history = chatbot_interface(message, history, use_web_search, model, temperature) |
|
return "", updated_history |
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
|
is_generating = gr.State(False) |
|
|
|
def protected_clear_and_update_chat(message, history, use_web_search, model, temperature, is_generating): |
|
if is_generating: |
|
return message, history, is_generating |
|
is_generating = True |
|
updated_message, updated_history = clear_and_update_chat(message, history, use_web_search, model, temperature) |
|
is_generating = False |
|
return updated_message, updated_history, is_generating |
|
|
|
gr.Markdown("# AI-powered Web Search and PDF Chat Assistant") |
|
|
|
with gr.Row(): |
|
file_input = gr.Files(label="Upload your PDF documents", file_types=[".pdf"]) |
|
parser_dropdown = gr.Dropdown(choices=["pypdf", "llamaparse"], label="Select PDF Parser", value="llamaparse") |
|
update_button = gr.Button("Upload Document") |
|
|
|
update_output = gr.Textbox(label="Update Status") |
|
update_button.click(update_vectors, inputs=[file_input, parser_dropdown], outputs=update_output) |
|
|
|
chatbot = gr.Chatbot(label="Conversation") |
|
msg = gr.Textbox(label="Ask a question") |
|
use_web_search = gr.Checkbox(label="Use Web Search", value=False) |
|
|
|
with gr.Row(): |
|
model_dropdown = gr.Dropdown(choices=MODELS, label="Select Model", value=MODELS[2]) |
|
temperature_slider = gr.Slider(minimum=0.1, maximum=1.0, value=0.2, step=0.1, label="Temperature") |
|
|
|
submit = gr.Button("Submit") |
|
|
|
gr.Examples( |
|
examples=[ |
|
["What are the latest developments in AI?"], |
|
["Tell me about recent updates on GitHub"], |
|
["What are the best hotels in Galapagos, Ecuador?"], |
|
["Summarize recent advancements in Python programming"], |
|
], |
|
inputs=msg, |
|
) |
|
|
|
submit.click(protected_clear_and_update_chat, |
|
inputs=[msg, chatbot, use_web_search, model_dropdown, temperature_slider, is_generating], |
|
outputs=[msg, chatbot, is_generating]) |
|
msg.submit(protected_clear_and_update_chat, |
|
inputs=[msg, chatbot, use_web_search, model_dropdown, temperature_slider, is_generating], |
|
outputs=[msg, chatbot, is_generating]) |
|
|
|
gr.Markdown( |
|
""" |
|
## How to use |
|
1. Upload PDF documents using the file input at the top. |
|
2. Select the PDF parser (pypdf or llamaparse) and click "Upload Document" to update the vector store. |
|
3. Ask questions in the textbox. |
|
4. Toggle "Use Web Search" to switch between PDF chat and web search. |
|
5. Adjust Temperature and Repetition Penalty sliders to fine-tune the response generation. |
|
6. Click "Submit" or press Enter to get a response. |
|
""" |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(share=True) |