import logging import chainlit as cl import os from datasets import load_dataset from langchain_community.document_loaders.csv_loader import CSVLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_core.runnables import RunnableConfig from langchain_openai import OpenAIEmbeddings from langchain.embeddings import CacheBackedEmbeddings from langchain.storage import LocalFileStore from langchain_community.vectorstores import FAISS from langchain_core.runnables.passthrough import RunnablePassthrough from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI import asyncio logging.info(""" ================================================================================= STARTING ================================================================================= """) # Download the data set and save as CSV if it doesn't exist yet. csv_path = "./imdb.csv" if not os.path.exists(csv_path): dataset = load_dataset("ShubhamChoksi/IMDB_Movies") dataset['train'].to_csv('imdb.csv') loader = CSVLoader(file_path=csv_path) data = loader.load() # Split data in chunks text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) chunked_documents = text_splitter.split_documents(data) # Store the chunked documents in the vector store if that was not done already embedding_model = OpenAIEmbeddings() store = LocalFileStore("./cache/") cached_embedder = CacheBackedEmbeddings.from_bytes_store(embedding_model, store, namespace=embedding_model.model) index_path = "faiss_index" if os.path.exists(index_path): vector_store = FAISS.load_local(index_path, cached_embedder, allow_dangerous_deserialization=True) logging.info("Vector store loaded from saved index.") else: vector_store = FAISS.from_documents(chunked_documents, cached_embedder) logging.info("Vector store created from documents.") vector_store.save_local(index_path) logging.info("Vector store saved locally.") @cl.on_chat_start async def on_chat_start(): logging.info(""" ================================================================================= ON START CHAT ================================================================================= """) prompt_template = ChatPromptTemplate.from_template("""You are an AI agent that suggests movies to people. Answer the question based on the context below. If the question cannot be answered provide answer with "Sorry, I cannot help you" Context: {context} Question:{question} """) retriever = vector_store.as_retriever() chat_model = ChatOpenAI(model="gpt-4o", temperature=0, streaming=True) parser = StrOutputParser() runnable = {"context": retriever, "question": RunnablePassthrough()} | prompt_template | chat_model | parser cl.user_session.set("runnable", runnable) @cl.on_message async def on_message(message: cl.Message): logging.info(f""" ================================================================================= ON MESSAGE: {message.content} ================================================================================= """) runnable = cl.user_session.get("runnable") await cl.Message(content="Lemme see what I can do for you...").send() msg = cl.Message(content="") async for chunk in runnable.astream( message.content, config=RunnableConfig(callbacks=[cl.LangchainCallbackHandler()]), ): logging.info(f"Received chunk <{chunk}>") await msg.stream_token(chunk) logging.info(f"Sending message") await msg.send() logging.info(f"Done with <{message.content}>")