import os import zipfile from typing import Dict, List, Optional, Union import gradio as gr from groq import Groq from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import PromptTemplate from langchain_core.runnables import RunnablePassthrough from langchain_groq import ChatGroq from langchain_huggingface import HuggingFaceEmbeddings from langchain_core.vectorstores import InMemoryVectorStore # Retrieve API key for Groq from the environment variables GROQ_API_KEY = os.environ.get("GROQ_API_KEY") # Initialize the Groq client client = Groq(api_key=GROQ_API_KEY) # Initialize the LLM llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", api_key=GROQ_API_KEY) # Initialize the embedding model embed_model = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") # General constants for the UI TITLE = """
📚 Multiple {file_types_str} uploaded ({total_files_processed} files)
Extracted {total_files_extracted} text file(s) in total
Uploaded files:
{file_list}", ) ) # Handle single file upload elif len(files) == 1: file = files[0] filename = os.path.basename(file) file_ext = os.path.splitext(filename)[1].lower() # Process based on file type if file_ext == ".zip": extracted_files = extract_text_from_zip(file) file_type_msg = "📦 ZIP file" else: extracted_files = extract_text_from_single_file(file) file_type_msg = "📄 File" if not extracted_files: chatbot.append( gr.ChatMessage( role="user", content=f"
{file_type_msg} uploaded: {filename}, but no text content was found or the file format is not supported.
", ) ) else: file_list = "\n".join([f"- {name}" for name in extracted_files.keys()]) chatbot.append( gr.ChatMessage( role="user", content=f"{file_type_msg} uploaded: {filename}
Extracted {len(extracted_files)} text file(s):
{file_list}", ) ) # Store the extracted content in the global variable EXTRACTED_FILES[filename] = extracted_files # Process the extracted files and create vector embeddings if EXTRACTED_FILES: # Prepare documents for processing all_texts = [] for filename, files in EXTRACTED_FILES.items(): for file_path, content in files.items(): all_texts.append( {"page_content": content, "metadata": {"source": file_path}} ) # Create document objects from langchain_core.documents import Document documents = [ Document(page_content=item["page_content"], metadata=item["metadata"]) for item in all_texts ] # Split the documents into chunks chunks = text_splitter.split_documents(documents) # Create the vector store VECTORSTORE = InMemoryVectorStore.from_documents( documents=chunks, embedding=embed_model, ) # Create the retriever retriever = VECTORSTORE.as_retriever() # Create the RAG chain RAG_CHAIN = ( {"context": retriever, "question": RunnablePassthrough()} | rag_prompt | llm | StrOutputParser() ) # Add a confirmation message chatbot.append( gr.ChatMessage( role="assistant", content="Documents processed and indexed. You can now ask questions about the content.", ) ) return chatbot def user(text_prompt: str, chatbot: List[gr.ChatMessage]): """ Append a new user text message to the chat history. Parameters: text_prompt (str): The input text provided by the user. chatbot (List[gr.ChatMessage]): The existing conversation history. Returns: Tuple[str, List[gr.ChatMessage]]: A tuple of an empty string (clearing the prompt) and the updated conversation history. """ if text_prompt: chatbot.append(gr.ChatMessage(role="user", content=text_prompt)) return "", chatbot def get_message_content(msg): """ Retrieve the content of a message that can be either a dictionary or a gr.ChatMessage. Parameters: msg (Union[dict, gr.ChatMessage]): The message object. Returns: str: The textual content of the message. """ if isinstance(msg, dict): return msg.get("content", "") return msg.content def process_query(chatbot: List[Union[dict, gr.ChatMessage]]): """ Process the user's query using the RAG pipeline. Parameters: chatbot (List[Union[dict, gr.ChatMessage]]): The conversation history. Returns: List[Union[dict, gr.ChatMessage]]: The updated conversation history with the response. """ global RAG_CHAIN if len(chatbot) == 0: chatbot.append( gr.ChatMessage( role="assistant", content="Please enter a question or upload documents to start the conversation.", ) ) return chatbot # Get the last user message as the prompt user_messages = [ msg for msg in chatbot if (isinstance(msg, dict) and msg.get("role") == "user") or (hasattr(msg, "role") and msg.role == "user") ] if not user_messages: chatbot.append( gr.ChatMessage( role="assistant", content="Please enter a question to start the conversation.", ) ) return chatbot last_user_msg = user_messages[-1] prompt = get_message_content(last_user_msg) # Skip if the last message was about uploading a file if ( "📦 ZIP file uploaded:" in prompt or "📄 File uploaded:" in prompt or "📚 Multiple files uploaded" in prompt ): return chatbot # Check if RAG chain is available if RAG_CHAIN is None: chatbot.append( gr.ChatMessage( role="assistant", content="Please upload documents first to enable question answering.", ) ) return chatbot # Append a placeholder for the assistant's response chatbot.append(gr.ChatMessage(role="assistant", content="Thinking...")) try: # Process the query through the RAG chain response = RAG_CHAIN.invoke(prompt) # Update the placeholder with the actual response chatbot[-1].content = response except Exception as e: # Handle any errors chatbot[-1].content = f"Error processing your query: {str(e)}" return chatbot def reset_app(chatbot): """ Reset the app by clearing the chat context and removing any uploaded files. Parameters: chatbot (List[Union[dict, gr.ChatMessage]]): The conversation history. Returns: List[Union[dict, gr.ChatMessage]]: A fresh conversation history. """ global EXTRACTED_FILES, VECTORSTORE, RAG_CHAIN # Clear the global variables EXTRACTED_FILES = {} VECTORSTORE = None RAG_CHAIN = None # Reset the chatbot with a welcome message return [ gr.ChatMessage( role="assistant", content="App has been reset. You can start a new conversation or upload new documents.", ) ] # Define the Gradio UI components chatbot_component = gr.Chatbot( label="Llama 4 RAG", type="messages", bubble_full_width=False, avatar_images=AVATAR_IMAGES, scale=2, height=350, ) text_prompt_component = gr.Textbox( placeholder="Ask a question about your documents...", show_label=False, autofocus=True, scale=28, ) upload_files_button_component = gr.UploadButton( label="Upload", file_count="multiple", file_types=[".zip", ".docx"] + TEXT_EXTENSIONS, scale=1, min_width=80, ) send_button_component = gr.Button( value="Send", variant="primary", scale=1, min_width=80 ) reset_button_component = gr.Button(value="Reset", variant="stop", scale=1, min_width=80) # Define input lists for button chaining user_inputs = [text_prompt_component, chatbot_component] with gr.Blocks(theme=gr.themes.Ocean()) as demo: gr.HTML(TITLE) with gr.Column(): chatbot_component.render() with gr.Row(equal_height=True): text_prompt_component.render() send_button_component.render() upload_files_button_component.render() reset_button_component.render() # When the Send button is clicked, first process the user text then process the query send_button_component.click( fn=user, inputs=user_inputs, outputs=[text_prompt_component, chatbot_component], queue=False, ).then( fn=process_query, inputs=[chatbot_component], outputs=[chatbot_component], api_name="process_query", ) # Allow submission using the Enter key text_prompt_component.submit( fn=user, inputs=user_inputs, outputs=[text_prompt_component, chatbot_component], queue=False, ).then( fn=process_query, inputs=[chatbot_component], outputs=[chatbot_component], api_name="process_query_submit", ) # Handle file uploads upload_files_button_component.upload( fn=upload_files, inputs=[upload_files_button_component, chatbot_component], outputs=[chatbot_component], queue=False, ) # Handle Reset button clicks reset_button_component.click( fn=reset_app, inputs=[chatbot_component], outputs=[chatbot_component], queue=False, ) # Launch the demo interface demo.queue().launch()