import json from langchain_openai import ChatOpenAI from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain.schema.runnable import Runnable from langchain.schema.runnable.config import RunnableConfig from langchain.memory import ChatMessageHistory from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory from langchain.chains import create_history_aware_retriever, create_retrieval_chain from langchain.chains.combine_documents import create_stuff_documents_chain import chainlit as cl from retriever import fetch_retriever_or_load_local_retriever # to run locally use: chainlit run app.py -w # see https://python.langchain.com/v0.1/assets/images/conversational_retrieval_chain-5c7a96abe29e582bc575a0a0d63f86b0.png for architecture diagram llm = ChatOpenAI(model="gpt-4o", temperature=0, streaming=True) # load local FAISS retriever with preloaded embeddings retriever = fetch_retriever_or_load_local_retriever() ### Contextualize question ### contextualize_q_system_prompt = """Given a chat history and the latest user question \ which might reference context in the chat history, formulate a standalone question \ which can be understood without the chat history. Do NOT answer the question, \ just reformulate it if needed and otherwise return it as is.""" contextualize_q_prompt = ChatPromptTemplate.from_messages( [ ("system", contextualize_q_system_prompt), MessagesPlaceholder("chat_history"), ("human", "{input}"), ] ) # also manages the case where chat_history is empty, and otherwise applies prompt | llm | StrOutputParser() | retriever in sequence. history_aware_retriever = create_history_aware_retriever( llm, retriever, contextualize_q_prompt ) ### Answer question qa_system_prompt = """You're an assistant that answers questions about movies and films. \ and eloquent answers to questions about movies. Use the following pieces of \ retrieved context to answer the question. Use three sentences maximum and \ keep the answer concise. {context}""" qa_prompt = ChatPromptTemplate.from_messages( [ ("system", qa_system_prompt), MessagesPlaceholder("chat_history"), ("human", "{input}"), ] ) # chain to accept the retrieved context alongside the conversation history and query to generate an answer question_answer_chain = create_stuff_documents_chain(llm, qa_prompt) # This chain applies the history_aware_retriever and question_answer_chain in sequence, retaining intermediate outputs such as the retrieved context for convenience rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) ### Statefully manage chat history store = {} def get_session_history(session_id: str) -> BaseChatMessageHistory: if session_id not in store: store[session_id] = ChatMessageHistory() return store[session_id] @cl.on_chat_start async def on_chat_start(): conversational_rag_chain = RunnableWithMessageHistory( rag_chain, get_session_history, input_messages_key="input", history_messages_key="chat_history", output_messages_key="answer", ) cl.user_session.set("runnable", conversational_rag_chain) @cl.on_message async def on_message(message: cl.Message): runnable = cl.user_session.get("runnable") # type: Runnable msg = cl.Message(content="") async for chunk in runnable.astream( {"input": message.content}, config=RunnableConfig( callbacks=[cl.LangchainCallbackHandler()], configurable={"session_id": cl.user_session.get("id")}, ), ): # process Documents to be JSON serializable and passed into the context window but not served up as part of the tokened response if "context" in chunk: docs = chunk["context"] docs_dict = [ {"page_content": doc.page_content, "metadata": doc.metadata} for doc in docs ] chunk["context"] = json.dumps(docs_dict) if "answer" in chunk: await msg.stream_token(chunk["answer"]) await msg.send()