Spaces:
Sleeping
Sleeping
import gradio as gr | |
import torch | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.vectorstores import FAISS | |
from langchain.chains import ConversationalRetrievalChain | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain.memory import ConversationBufferMemory | |
from langchain_community.llms import HuggingFaceEndpoint | |
import os | |
import time | |
# Load models and tokenizers | |
sentiment_tokenizer = AutoTokenizer.from_pretrained("dnzblgn/Sentiment-Analysis-Customer-Reviews") | |
sentiment_model = AutoModelForSequenceClassification.from_pretrained("dnzblgn/Sentiment-Analysis-Customer-Reviews") | |
sarcasm_tokenizer = AutoTokenizer.from_pretrained("dnzblgn/Sarcasm-Detection-Customer-Reviews") | |
sarcasm_model = AutoModelForSequenceClassification.from_pretrained("dnzblgn/Sarcasm-Detection-Customer-Reviews") | |
doc_tokenizer = AutoTokenizer.from_pretrained("dnzblgn/Customer-Reviews-Classification") | |
doc_model = AutoModelForSequenceClassification.from_pretrained("dnzblgn/Customer-Reviews-Classification") | |
label_mapping = { | |
"shipping_and_delivery": 0, | |
"customer_service": 1, | |
"price_and_value": 2, | |
"quality_and_performance": 3, | |
"use_and_design": 4, | |
"other": 5 | |
} | |
reverse_label_mapping = {v: k for k, v in label_mapping.items()} | |
def analyze_reviews(reviews): | |
analysis = { | |
"overall": {"positive": 0, "negative": 0}, | |
"categories": {label: {"positive": 0, "negative": 0} for label in label_mapping.keys()} | |
} | |
for review in reviews: | |
sentiment_inputs = sentiment_tokenizer(review, return_tensors="pt", truncation=True, padding=True, max_length=512) | |
with torch.no_grad(): | |
sentiment_outputs = sentiment_model(**sentiment_inputs) | |
sentiment_class = torch.argmax(sentiment_outputs.logits, dim=-1).item() | |
sentiment = "positive" if sentiment_class == 0 else "negative" | |
if sentiment == "positive": | |
sarcasm_inputs = sarcasm_tokenizer(review, return_tensors="pt", truncation=True, padding=True, max_length=512) | |
with torch.no_grad(): | |
sarcasm_outputs = sarcasm_model(**sarcasm_inputs) | |
sarcasm_class = torch.argmax(sarcasm_outputs.logits, dim=-1).item() | |
if sarcasm_class == 1: | |
sentiment = "negative" | |
doc_inputs = doc_tokenizer(review, return_tensors="pt", truncation=True, padding=True, max_length=512) | |
with torch.no_grad(): | |
doc_outputs = doc_model(**doc_inputs) | |
category_class = torch.argmax(doc_outputs.logits, dim=-1).item() | |
category = reverse_label_mapping[category_class] | |
analysis["overall"][sentiment] += 1 | |
analysis["categories"][category][sentiment] += 1 | |
return analysis | |
def generate_analysis_document(analysis): | |
total_reviews = analysis["overall"]["positive"] + analysis["overall"]["negative"] | |
overall_positive = analysis["overall"]["positive"] | |
overall_negative = analysis["overall"]["negative"] | |
doc = [ | |
f"Overall Sentiment Analysis:", | |
f"Positive Feedback: {overall_positive} comments ({(overall_positive / total_reviews) * 100:.0f}%)", | |
f"Negative Feedback: {overall_negative} comments ({(overall_negative / total_reviews) * 100:.0f}%)", | |
"--END--", | |
"Category-Specific Analysis:", | |
"--END--" | |
] | |
for category, feedback in analysis["categories"].items(): | |
total_category = feedback["positive"] + feedback["negative"] | |
positive_rate = (feedback["positive"] / total_category) * 100 if total_category > 0 else 0 | |
negative_rate = (feedback["negative"] / total_category) * 100 if total_category > 0 else 0 | |
doc.extend([ | |
f"{category.capitalize()}:", | |
f"- Positive Feedback: {feedback['positive']} comments ({positive_rate:.0f}%)", | |
f"- Negative Feedback: {feedback['negative']} comments ({negative_rate:.0f}%)", | |
"--END--" | |
]) | |
return "\n".join(doc) | |
def write_analysis_to_file(analysis_document): | |
with open("processed_analysis.txt", "w") as f: | |
f.write(analysis_document) | |
return "processed_analysis.txt" | |
def read_processed_file(): | |
with open("processed_analysis.txt", "r") as f: | |
return f.read() | |
def create_db_from_analysis(analysis_document): | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1024, chunk_overlap=64 | |
) | |
splits = text_splitter.create_documents([analysis_document]) | |
embeddings = HuggingFaceEmbeddings() | |
vector_db = FAISS.from_documents(splits, embeddings) | |
return vector_db | |
def initialize_chatbot(vector_db): | |
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) | |
retriever = vector_db.as_retriever() | |
llm = HuggingFaceEndpoint( | |
repo_id="mistralai/Mistral-7B-Instruct-v0.2", | |
huggingfacehub_api_token = os.environ.get("HUGGINGFACE_API_TOKEN"), | |
temperature=0.5, | |
max_new_tokens=256 | |
) | |
qa_chain = ConversationalRetrievalChain.from_llm( | |
llm=llm, | |
retriever=retriever, | |
memory=memory, | |
verbose=False | |
) | |
return qa_chain | |
def process_and_initialize(file): | |
if file is None: | |
return None, None, "Please upload a file first." | |
try: | |
if not os.path.exists(file): | |
return None, None, "File not found. Please try uploading again." | |
with open(file, 'r', encoding='utf-8') as f: | |
reviews = [line.strip() for line in f if line.strip()] | |
if not reviews: | |
return None, None, "File is empty. Please upload a file with reviews." | |
analysis = analyze_reviews(reviews) | |
analysis_doc = generate_analysis_document(analysis) | |
processed_file = write_analysis_to_file(analysis_doc) | |
processed_content = read_processed_file() | |
db = create_db_from_analysis(processed_content) | |
qa = initialize_chatbot(db) | |
return db, qa, f"Successfully processed {len(reviews)} reviews! Ready for questions." | |
except Exception as e: | |
return None, None, f"Processing error: {str(e)}" | |
def user_query_typing_effect(query, qa_chain, chatbot): | |
history = chatbot or [] | |
try: | |
response = qa_chain.invoke({"question": query, "chat_history": []}) | |
assistant_response = response["answer"] | |
history.append({"role": "user", "content": query}) | |
history.append({"role": "assistant", "content": ""}) | |
for i in range(len(assistant_response)): | |
history[-1]["content"] += assistant_response[i] | |
yield history, "" | |
time.sleep(0.05) # Slower typing effect | |
except Exception as e: | |
history.append({"role": "assistant", "content": f"Error: {str(e)}"}) | |
yield history, "" | |
def demo(): | |
custom_css = """ | |
body { | |
background-color: #FF8C00; | |
font-family: Arial, sans-serif; | |
} | |
.gradio-container { | |
border-radius: 15px; | |
box-shadow: 0px 4px 20px rgba(0, 0, 0, 0.3); | |
padding: 20px; | |
} | |
footer { | |
visibility: hidden; | |
} | |
.chatbot { | |
border: 2px solid #000; | |
border-radius: 10px; | |
background-color: #FFF5E1; | |
} | |
""" | |
with gr.Blocks(css=custom_css) as app: | |
vector_db = gr.State(None) | |
qa_chain = gr.State(None) | |
gr.Markdown("### π **Customer Review Analysis and Chatbot** π") | |
gr.Markdown("#### Upload your review file and ask questions interactively!") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
txt_file = gr.File( | |
label="π Upload Reviews", | |
file_types=[".txt"], | |
type="filepath" | |
) | |
analyze_btn = gr.Button("π Process Reviews") | |
status = gr.Textbox( | |
label="π Status", | |
placeholder="Status updates will appear here...", | |
interactive=False | |
) | |
with gr.Column(scale=3): | |
chatbot = gr.Chatbot( | |
label="π€ Chat with your data", | |
height=600, | |
bubble_full_width=False, | |
show_label=False, | |
render_markdown=True, | |
type="messages", | |
elem_classes=["chatbot"] | |
) | |
query_input = gr.Textbox( | |
label="Ask a question", | |
placeholder="Ask about the reviews...", | |
show_label=False, | |
container=False | |
) | |
query_btn = gr.Button("Ask") | |
analyze_btn.click( | |
fn=process_and_initialize, | |
inputs=[txt_file], | |
outputs=[vector_db, qa_chain, status], | |
show_progress="minimal" | |
) | |
query_btn.click( | |
fn=user_query_typing_effect, | |
inputs=[query_input, qa_chain, chatbot], | |
outputs=[chatbot, query_input], | |
show_progress="minimal" | |
) | |
query_input.submit( | |
fn=user_query_typing_effect, | |
inputs=[query_input, qa_chain, chatbot], | |
outputs=[chatbot, query_input], | |
show_progress="minimal" | |
) | |
app.launch() | |
if __name__ == "__main__": | |
demo() | |