from functools import cache import os import pandas as pd import gradio as gr from langchain.llms import OpenAI from langchain.chains.summarize import load_summarize_chain from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores import Chroma from langchain.embeddings.openai import OpenAIEmbeddings from langchain.chains.question_answering import load_qa_chain from langchain.schema import Document import tiktoken @cache def tiktoken_len_builder(model_name): tokenizer = tiktoken.encoding_for_model(model_name) def token_len(text): tokens = tokenizer.encode(text, disallowed_special=()) return len(tokens) return token_len def split_documents(docs, length_function, chunk_size=400): text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=20, length_function=length_function, ) return text_splitter.split_documents(docs) def summarize_docs(llm, docs): chain = load_summarize_chain(llm, chain_type="map_reduce") return chain.run(docs) class MdnaQA: def __init__(self, llm, docs): self.docs = docs self.chain = load_qa_chain(llm, chain_type="stuff") embeddings = OpenAIEmbeddings(openai_api_key=llm.openai_api_key) self.docsearch = Chroma.from_documents(docs, embeddings) def ask(self, question): input_documents = self.docsearch.similarity_search(question) return self.chain.run(input_documents=input_documents, question=question) reports = pd.read_pickle("data/reports.pickle") reports["label"] = reports.apply( lambda report: f"{report.ticker} {report.report_date.year} Q{report.report_date.quarter} {report.form}", axis=1, ) model_name = "text-davinci-003" tiktoken_len = tiktoken_len_builder(model_name) title = "Summarize Earnings Report & Ask Custom Questions" get_window_url_params = """ function(report, url_params) { const params = new URLSearchParams(window.location.search); url_params = Object.fromEntries(params); return [report, url_params]; } """ set_window_url_params = """ function(report, url_params) { const params = new URLSearchParams(window.location.search); params.set("report", report) url_params = Object.fromEntries(params); const queryString = '?' + params.toString(); window.history.replaceState(null, null, queryString); // this next line is only needed inside Spaces, so the child frame updates parent window.parent.postMessage({ queryString: queryString }, "*") return [report, url_params]; } """ report_choices = reports["label"].tolist() with gr.Blocks(title=title) as block: gr.Markdown(f"# {title}") gr.Markdown("## [Earning Report Insights Blog Posts](https://blog.experienced.dev)") url_params = gr.JSON({}, visible=False, label="URL Params") report = gr.Dropdown( label="Select a report", choices=report_choices, value=report_choices[0], interactive=True, ) def on_report_change(report, url_params): return [report, url_params] report.change( on_report_change, inputs=[report, url_params], outputs=[report, url_params], _js=set_window_url_params, ) mdna = gr.State() def get_mdna(report): text = reports.loc[reports["label"] == report]["mdna"].values[0] documents = [Document(page_content=text)] mdna = split_documents(documents, tiktoken_len) tokens_sum = sum(tiktoken_len(d.page_content) for d in mdna) return [mdna, gr.Textbox.update(value=str(tokens_sum))] def select_report(report, url_params): if "report" in url_params: report = url_params["report"] return [report, url_params] block.load( fn=select_report, inputs=[report, url_params], outputs=[report, url_params], _js=get_window_url_params, ) gr.Markdown( "You can get an API key [from OpenAI](https://platform.openai.com/account/api-keys)" ) openai_api_key = gr.Text( value=os.getenv("OPENAI_API_KEY"), type="password", label="OpenAI API key", ) temperature = gr.Slider( 0, 2, value=0, step=0.1, label="Temperature", info="adjusts a model's output from predictable to random", ) tokens_total = gr.Textbox( label="Total input tokens", value=0, info="how many tokens will be spent on input / embeddings", ) report.change( get_mdna, inputs=report, outputs=[mdna, tokens_total], ) with gr.Tabs(visible=True) as tabs: with gr.TabItem("Summary"): summarize = gr.Button( "Summarize MD&A", variant="primary", info="On click you spent tokens on input, instructions and output", ) summary = gr.TextArea(label="Summary") def summarize_mdna(docs, api_key, temp): llm = OpenAI(temperature=temp, openai_api_key=api_key) mdna_summary = summarize_docs(llm, docs) return mdna_summary summarize.click( summarize_mdna, inputs=[mdna, openai_api_key, temperature], outputs=[summary], ) with gr.TabItem("QA with MD&A"): start_qa = gr.Button("Start QA with MD&A", variant="primary") chatbot = gr.Chatbot(label="QA with MD&A", visible=False) question = gr.Textbox( label="Your question", interactive=True, visible=False ) qa_chat = gr.State() send = gr.Button("Ask question", variant="primary", visible=False) def start_chat(docs, api_key, temp): llm = OpenAI(temperature=temp, openai_api_key=api_key) qa_chat = MdnaQA(llm, docs) return ( qa_chat, gr.Textbox.update(visible=True), gr.Textbox.update(visible=True), gr.Button.update(visible=True), ) start_qa.click( start_chat, [mdna, openai_api_key, temperature], [qa_chat, chatbot, question, send], ) def respond(qa_chat, question, chat_history): answer = qa_chat.ask(question) chat_history.append((question, answer)) return "", chat_history send.click(respond, [qa_chat, question, chatbot], [question, chatbot]) question.submit(respond, [qa_chat, question, chatbot], [question, chatbot]) block.launch()