import gradio as gr
import copy
from llama_cpp import Llama
from huggingface_hub import hf_hub_download
import chromadb
from datasets import load_dataset
from sentence_transformers import SentenceTransformer
# Initialize the Llama model
llm = Llama(
# model_path=hf_hub_download(
# repo_id="microsoft/Phi-3-mini-4k-instruct-gguf",
# filename="Phi-3-mini-4k-instruct-q4.gguf",
# ),
model_path=hf_hub_download(
repo_id="Ankitajadhav/Phi-3-mini-4k-instruct-q4.gguf",
filename="Phi-3-mini-4k-instruct-q4.gguf",
),
n_ctx=2048,
n_gpu_layers=50, # Adjust based on your VRAM
)
# Initialize ChromaDB Vector Store
class VectorStore:
def __init__(self, collection_name):
self.embedding_model = SentenceTransformer('sentence-transformers/multi-qa-MiniLM-L6-cos-v1')
self.chroma_client = chromadb.Client()
self.collection = self.chroma_client.create_collection(name=collection_name)
# def populate_vectors(self, texts):
# embeddings = self.embedding_model.encode(texts, batch_size=32).tolist()
# for text, embedding in zip(texts, embeddings, ids):
# self.collection.add(embeddings=[embedding], documents=[text], ids=[doc_id])
# Method to populate the vector store with embeddings from a dataset
def populate_vectors(self, dataset):
# Select the text columns to concatenate
# title = dataset['train']['title_cleaned'][:1000] # Limiting to 100 examples for the demo
recipe = dataset['train']['recipe_new'][:1000]
allergy = dataset['train']['allergy_type'][:1000]
ingredients = dataset['train']['ingredients_alternatives'][:1000]
# Concatenate the text from both columns
texts = [f"{rep} {ingr} {alle}" for rep, ingr,alle in zip(recipe, ingredients,allergy)]
for i, item in enumerate(texts):
embeddings = self.embedding_model.encode(item).tolist()
self.collection.add(embeddings=[embeddings], documents=[item], ids=[str(i)])
def search_context(self, query, n_results=1):
query_embedding = self.embedding_model.encode([query]).tolist()
results = self.collection.query(query_embeddings=query_embedding, n_results=n_results)
return results['documents']
# Example initialization (assuming you've already populated the vector store)
dataset = load_dataset('Thefoodprocessor/recipe_new_with_features_full')
vector_store = VectorStore("embedding_vector")
vector_store.populate_vectors(dataset)
def format_recipe(input_string):
# Clean up the input
cleaned_text = input_string.strip("[]'").replace('\\n', '\n')
# Split the text into lines
lines = cleaned_text.split('\n')
# Initialize sections
title = lines[0]
ingredients = []
instructions = []
substitutions = []
# Extract ingredients and instructions
in_instructions = False
for line in lines[1:]:
if line.startswith("Instructions:"):
in_instructions = True
continue
if in_instructions:
if line.strip(): # Check for non-empty lines
instructions.append(line.strip())
else:
if line.strip(): # Check for non-empty lines
ingredients.append(line.strip())
# Gather substitutions from the last few lines
for line in lines:
if ':' in line:
substitutions.append(line.strip())
# Format output
formatted_recipe = f"## {title}\n\n### Ingredients:\n"
formatted_recipe += '\n'.join(f"- {item}" for item in ingredients) + "\n\n"
formatted_recipe += "### Instructions:\n" + '\n'.join(f"{i + 1}. {line}" for i, line in enumerate(instructions)) + "\n\n"
if substitutions:
formatted_recipe += "### Substitutions:\n" + '\n'.join(f"- **{line.split(':')[0].strip()}**: {line.split(':')[1].strip()}" for line in substitutions) + "\n"
return formatted_recipe
# print(formatted_recipe)
def generate_text(
message,
history: list[tuple[str, str]],
system_message,
max_tokens,
temperature,
top_p,
):
# Retrieve context from vector store
context_results = vector_store.search_context(message, n_results=1)
context = context_results[0] if context_results else ""
input_prompt = f"[INST] <>\n{system_message}\n<>\n\n {context}\n"
for interaction in history:
input_prompt += f"{interaction[0]} [/INST] {interaction[1]} [INST] "
input_prompt += f"{message} [/INST] "
print("Input prompt:", input_prompt) # Debugging output
temp = ""
output = llm(
input_prompt,
temperature=temperature,
top_p=top_p,
top_k=40,
repeat_penalty=1.1,
max_tokens=max_tokens,
stop=["", " \n", "ASSISTANT:", "USER:", "SYSTEM:"],
stream=True,
)
for out in output:
temp += format_recipe(out["choices"][0]["text"])
yield temp
# Define the Gradio interface
demo = gr.ChatInterface(
generate_text,
title="llama-cpp-python on GPU with ChromaDB",
description="Running LLM with context retrieval from ChromaDB",
examples=[
["I have leftover rice, what can I make out of it?"],
["Can I make lunch for two people with this?"],
],
cache_examples=False,
retry_btn=None,
undo_btn="Delete Previous",
clear_btn="Clear",
additional_inputs=[
gr.Textbox(value="You are a friendly Chatbot.", label="System message"),
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"),
],
)
if __name__ == "__main__":
demo.launch()