Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI | |
| #from pydantic import BaseModel | |
| # from transformers import pipeline | |
| from txtai.embeddings import Embeddings | |
| from txtai.pipeline import Extractor | |
| from langchain.document_loaders import WebBaseLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain import HuggingFaceHub | |
| from langchain.prompts import PromptTemplate | |
| from langchain.chains import LLMChain | |
| from txtai.embeddings import Embeddings | |
| from txtai.pipeline import Extractor | |
| import pandas as pd | |
| import sqlite3 | |
| import os | |
| # NOTE - we configure docs_url to serve the interactive Docs at the root path | |
| # of the app. This way, we can use the docs as a landing page for the app on Spaces. | |
| app = FastAPI(docs_url="/") | |
| # app = FastAPI() | |
| os.environ["HUGGINGFACEHUB_API_TOKEN"] = "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" | |
| # pipe = pipeline("text2text-generation", model="google/flan-t5-small") | |
| # @app.get("/generate") | |
| # def generate(text: str): | |
| # """ | |
| # Using the text2text-generation pipeline from `transformers`, generate text | |
| # from the given input text. The model used is `google/flan-t5-small`, which | |
| # can be found [here](https://huggingface.co/google/flan-t5-small). | |
| # """ | |
| # output = pipe(text) | |
| # return {"output": output[0]["generated_text"]} | |
| def load_embeddings( | |
| domain: str = "", | |
| db_present: bool = True, | |
| path: str = "sentence-transformers/all-MiniLM-L6-v2", | |
| index_name: str = "index", | |
| ): | |
| # Create embeddings model with content support | |
| embeddings = Embeddings({"path": path, "content": True}) | |
| # if Vector DB is not present | |
| if not db_present: | |
| return embeddings | |
| else: | |
| if domain == "": | |
| embeddings.load(index_name) # change this later | |
| else: | |
| print(3) | |
| embeddings.load(f"{index_name}/{domain}") | |
| return embeddings | |
| def _check_if_db_exists(db_path: str) -> bool: | |
| return os.path.exists(db_path) | |
| def _text_splitter(doc): | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=500, | |
| chunk_overlap=50, | |
| length_function=len, | |
| ) | |
| return text_splitter.transform_documents(doc) | |
| def _load_docs(path: str): | |
| load_doc = WebBaseLoader(path).load() | |
| doc = _text_splitter(load_doc) | |
| return doc | |
| def _stream(dataset, limit, index: int = 0): | |
| for row in dataset: | |
| yield (index, row.page_content, None) | |
| index += 1 | |
| if index >= limit: | |
| break | |
| def _max_index_id(path): | |
| db = sqlite3.connect(path) | |
| table = "sections" | |
| df = pd.read_sql_query(f"select * from {table}", db) | |
| return {"max_index": df["indexid"].max()} | |
| def _upsert_docs(doc, embeddings, vector_doc_path: str, db_present: bool): | |
| print(vector_doc_path) | |
| if db_present: | |
| print(1) | |
| max_index = _max_index_id(f"{vector_doc_path}/documents") | |
| print(max_index) | |
| embeddings.upsert(_stream(doc, 500, max_index["max_index"])) | |
| print("Embeddings done!!") | |
| embeddings.save(vector_doc_path) | |
| print("Embeddings done - 1!!") | |
| else: | |
| print(2) | |
| embeddings.index(_stream(doc, 500, 0)) | |
| embeddings.save(vector_doc_path) | |
| max_index = _max_index_id(f"{vector_doc_path}/documents") | |
| print(max_index) | |
| # check | |
| # max_index = _max_index_id(f"{vector_doc_path}/documents") | |
| # print(max_index) | |
| return max_index | |
| # def prompt(question): | |
| # return f"""Answer the following question using only the context below. Say 'no answer' when the question can't be answered. | |
| # Question: {question} | |
| # Context: """ | |
| # def search(query, question=None): | |
| # # Default question to query if empty | |
| # if not question: | |
| # question = query | |
| # return extractor([("answer", query, prompt(question), False)])[0][1] | |
| # @app.get("/rag") | |
| # def rag(question: str): | |
| # # question = "what is the document about?" | |
| # answer = search(question) | |
| # # print(question, answer) | |
| # return {answer} | |
| # @app.get("/index") | |
| # def get_url_file_path(url_path: str): | |
| # embeddings = load_embeddings() | |
| # doc = _load_docs(url_path) | |
| # embeddings, max_index = _upsert_docs(doc, embeddings) | |
| # return max_index | |
| def get_domain_file_path(domain: str, file_path: str): | |
| print(domain, file_path) | |
| print(os.getcwd()) | |
| bool_value = _check_if_db_exists(db_path=f"{os.getcwd()}/index/{domain}/documents") | |
| print(bool_value) | |
| if bool_value: | |
| embeddings = load_embeddings(domain=domain, db_present=bool_value) | |
| print(embeddings) | |
| doc = _load_docs(file_path) | |
| max_index = _upsert_docs( | |
| doc=doc, | |
| embeddings=embeddings, | |
| vector_doc_path=f"{os.getcwd()}/index/{domain}", | |
| db_present=bool_value, | |
| ) | |
| # print("-------") | |
| else: | |
| embeddings = load_embeddings(domain=domain, db_present=bool_value) | |
| doc = _load_docs(file_path) | |
| max_index = _upsert_docs( | |
| doc=doc, | |
| embeddings=embeddings, | |
| vector_doc_path=f"{os.getcwd()}/index/{domain}", | |
| db_present=bool_value, | |
| ) | |
| # print("Final - output : ", max_index) | |
| return "Executed Successfully!!" | |
| def _check_if_db_exists(db_path: str) -> bool: | |
| return os.path.exists(db_path) | |
| def _load_embeddings_from_db( | |
| db_present: bool, | |
| domain: str, | |
| #path: str = "sentence-transformers/all-MiniLM-L6-v2", | |
| path: str = "sentence-transformers/nli-mpnet-base-v2", | |
| ): | |
| # Create embeddings model with content support | |
| embeddings = Embeddings({"path": path, "content": True}) | |
| # if Vector DB is not present | |
| if not db_present: | |
| print("db not present") | |
| return embeddings | |
| else: | |
| if domain == "": | |
| print("domain empty") | |
| embeddings.load("index") # change this later | |
| else: | |
| print(3) | |
| embeddings.load(f"{os.getcwd()}/index/{domain}") | |
| return embeddings | |
| def _prompt(question): | |
| return f"""Answer the following question using only the context below. Say 'Could not find answer within the context' when the question can't be answered. | |
| Question: {question} | |
| Context: """ | |
| def _search(query, extractor, question=None): | |
| # Default question to query if empty | |
| if not question: | |
| question = query | |
| # template = f"""Answer the following question using only the context below. Say 'no answer' when the question can't be answered. | |
| # Question: {question} | |
| # Context: """ | |
| # prompt = PromptTemplate(template=template, input_variables=["question"]) | |
| # llm_chain = LLMChain(prompt=prompt, llm=extractor) | |
| # return {"question": question, "answer": llm_chain.run(question)} | |
| print(extractor([("answer", query, _prompt(question), False)])) | |
| return extractor([("answer", query, _prompt(question), False)])[0][1] | |
| # class ModelOutputEvaluate(BaseModel): | |
| # question: str | |
| # answer: str | |
| # domain: str | |
| # context: str | |
| class BasePromptContext: | |
| def __init__(self): | |
| self.variables_list = ["question","answer","context"] | |
| self.base_template = """Please act as an impartial judge and evaluate the quality of the provided answer which attempts to answer the provided question based on a provided context. | |
| And you'll need to submit your grading for the correctness, comprehensiveness and readability of the answer, using JSON format with the 2 items in parenthesis: | |
| ("score": [your score number for the correctness of the answer], "reasoning": [your one line step by step reasoning about the correctness of the answer]) | |
| Below is your grading rubric: | |
| - Correctness: If the answer correctly answer the question, below are the details for different scores: | |
| - Score 0: the answer is completely incorrect, doesn’t mention anything about the question or is completely contrary to the correct answer. | |
| - For example, when asked “How to terminate a databricks cluster”, the answer is empty string, or content that’s completely irrelevant, or sorry I don’t know the answer. | |
| - Score 4: the answer provides some relevance to the question and answer one aspect of the question correctly. | |
| - Example: | |
| - Question: How to terminate a databricks cluster | |
| - Answer: Databricks cluster is a cloud-based computing environment that allows users to process big data and run distributed data processing tasks efficiently. | |
| - Or answer: In the Databricks workspace, navigate to the "Clusters" tab. And then this is a hard question that I need to think more about it | |
| - Score 7: the answer mostly answer the question but is missing or hallucinating on one critical aspect. | |
| - Example: | |
| - Question: How to terminate a databricks cluster” | |
| - Answer: “In the Databricks workspace, navigate to the "Clusters" tab. | |
| Find the cluster you want to terminate from the list of active clusters. | |
| And then you’ll find a button to terminate all clusters at once” | |
| - Score 10: the answer correctly answer the question and not missing any major aspect | |
| - Example: | |
| - Question: How to terminate a databricks cluster | |
| - Answer: In the Databricks workspace, navigate to the "Clusters" tab. | |
| Find the cluster you want to terminate from the list of active clusters. | |
| Click on the down-arrow next to the cluster name to open the cluster details. | |
| Click on the "Terminate" button. A confirmation dialog will appear. Click "Terminate" again to confirm the action.” | |
| Provided question: | |
| {question} | |
| Provided answer: | |
| {answer} | |
| Provided context: | |
| {context} | |
| Please provide your grading for the correctness and explain you gave the particular grading""" | |
| class Evaluater: | |
| def __init__(self, item): | |
| self.question = item["question"] | |
| self.answer = item["answer"] | |
| self.domain = item["domain"] | |
| self.context = item["context"] | |
| self.llm=HuggingFaceHub(repo_id="google/flan-t5-xxl", model_kwargs={"temperature":1, "max_length":1000000}) | |
| def get_prompt_template(self): | |
| prompt = BasePromptContext() | |
| template = prompt.base_template | |
| varialbles = prompt.variables_list | |
| eval_template = PromptTemplate(input_variables=varialbles, template=template) | |
| return eval_template | |
| def evaluate(self): | |
| prompt = self.get_prompt_template().format(question = self.question, answer = self.answer, context = self.context) | |
| score = self.llm(prompt) | |
| return score | |
| # Create extractor instance | |
| def _create_evaluation_scenario(item): | |
| output = { | |
| "input": item, | |
| "score" : Evaluater(item).evaluate() | |
| } | |
| return output | |
| def rag(domain: str, question: str, evaluate: bool): | |
| print() | |
| db_exists = _check_if_db_exists(db_path=f"{os.getcwd()}/index/{domain}/documents") | |
| print(db_exists) | |
| bool_value = _check_if_db_exists(db_path=f"{os.getcwd()}/index/{domain}/documents") | |
| print(bool_value) | |
| # if db_exists: | |
| embeddings = _load_embeddings_from_db(db_exists, domain) | |
| # Create extractor instance | |
| #extractor = Extractor(embeddings, "google/flan-t5-base") | |
| #extractor = Extractor(embeddings, "TheBloke/Llama-2-7B-GGUF") | |
| print("before calling extractor") | |
| #extractor = Extractor(embeddings, "distilbert-base-cased-distilled-squad") | |
| extractor = Extractor(embeddings, "google/flan-t5-base") | |
| # llm = HuggingFaceHub( | |
| # repo_id="google/flan-t5-xxl", | |
| # model_kwargs={"temperature": 1, "max_length": 1000000}, | |
| # ) | |
| # else: | |
| print("before doing Q&A") | |
| answer = _search(question, extractor) | |
| #text = _prompt(question) | |
| #text += "\n" + "\n".join(x["text"] for x in embeddings.search(question)) | |
| context_list = sorted(embeddings.search(question), key=lambda x:x['id']) | |
| context = "\n".join(x["text"] for x in context_list) | |
| scored_value = "" | |
| if evaluate: | |
| scored_value = _create_evaluation_scenario({ | |
| "question": question, | |
| "answer": answer, | |
| "domain": domain, | |
| "context": context | |
| }) | |
| else: | |
| scored_value = { | |
| "input": {"question": question, "answer": answer, "domain": domain, "context": context}, | |
| "score": "Evaluation is Turned OFF" | |
| } | |
| return {"question": question, "answer": answer, "context": context, "score": scored_value["score"]} |