# -*- coding: utf-8 -*- """Financial RAG Chatbot (SLM + Cosine Similarity Confidence)""" # ============================================================================= # 📌 Imports & Setup # ============================================================================= import os import numpy as np import pandas as pd import faiss # Fast vector search from sentence_transformers import SentenceTransformer # Text embeddings from rank_bm25 import BM25Okapi # Keyword-based retrieval import spacy # Tokenization from sklearn.metrics.pairwise import cosine_similarity # Confidence scoring import gradio as gr # UI # Load Transformer for response generation from transformers import pipeline, DistilBertTokenizer, DistilBertForSequenceClassification # Load SpaCy English model nlp = spacy.load("en_core_web_sm") # ============================================================================= # 📌 1. Data Collection & Preprocessing # ============================================================================= # Load financial dataset csv_file_path = "MSFT_1986-03-13_2025-02-04.csv" df = pd.read_csv(csv_file_path) # Convert 'Date' column to datetime format df['Date'] = pd.to_datetime(df['Date']) # Sort data by Date df = df.sort_values(by='Date') # Extract Year from Date df['Year'] = df['Date'].dt.year # Aggregate data by Year to generate financial summaries yearly_summary = df.groupby('Year').agg( Open_Min=('Open', 'min'), Open_Max=('Open', 'max'), Close_Min=('Close', 'min'), Close_Max=('Close', 'max'), Avg_Volume=('Volume', 'mean') ).reset_index() # Create a textual summary for each year yearly_summary['Summary'] = yearly_summary.apply( lambda row: f"In {row['Year']}, the stock opened between ${row['Open_Min']:.2f} and ${row['Open_Max']:.2f}, " f"while closing between ${row['Close_Min']:.2f} and ${row['Close_Max']:.2f}. " f"The average trading volume was {row['Avg_Volume']:,.0f} shares.", axis=1 ) # ============================================================================= # 📌 2. RAG: Vector Embeddings + BM25 # ============================================================================= # Convert financial summaries into text chunks and generate vector embeddings embedding_model = SentenceTransformer("all-MiniLM-L6-v2") # Convert yearly financial summaries into vector embeddings summary_texts = yearly_summary["Summary"].tolist() # Extract summaries as text summary_embeddings = embedding_model.encode(summary_texts, convert_to_numpy=True) # Generate embeddings # Define FAISS index embedding_dim = 384 # MiniLM output dimension faiss_index = faiss.IndexFlatL2(embedding_dim) summary_embeddings = summary_embeddings.astype('float32') faiss_index.add(summary_embeddings) # Store the year information for retrieval year_map = {i: yearly_summary["Year"].iloc[i] for i in range(len(yearly_summary))} # BM25 for keyword-based retrieval tokenized_summaries = [[token.text.lower() for token in nlp(summary)] for summary in summary_texts] bm25 = BM25Okapi(tokenized_summaries) # ============================================================================= # 📌 3. Advanced Retrieval (Hybrid + Adaptive) # ============================================================================= def adaptive_retrieve(query_text, top_k=3, alpha=0.5): """Hybrid retrieval combining FAISS & BM25 scores.""" query_embedding = embedding_model.encode([query_text], convert_to_numpy=True).astype('float32') _, faiss_indices = faiss_index.search(query_embedding, top_k) query_tokens = [token.text.lower() for token in nlp(query_text)] bm25_scores = bm25.get_scores(query_tokens) bm25_top_indices = np.argsort(bm25_scores)[::-1][:top_k] faiss_scores = np.linspace(1, 0, num=top_k) bm25_norm_scores = (bm25_scores - np.min(bm25_scores)) / (np.max(bm25_scores) - np.min(bm25_scores) + 1e-9) combined_scores = {} for rank, idx in enumerate(faiss_indices[0]): combined_scores[idx] = alpha * faiss_scores[rank] for idx in bm25_top_indices: combined_scores[idx] = combined_scores.get(idx, 0) + (1 - alpha) * bm25_norm_scores[idx] sorted_results = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True) results = [(year_map[idx], yearly_summary.iloc[idx]["Summary"], combined_scores[idx]) for idx, _ in sorted_results] return pd.DataFrame(results, columns=["Year", "Summary", "Score"]) # ============================================================================= # 📌 4. Response Generation using DistilBERT # ============================================================================= # Load DistilBERT for response generation distilbert_tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased") distilbert_model = DistilBertForSequenceClassification.from_pretrained("distilbert-base-uncased") def generate_response(query_text, top_k=3, alpha=0.5): """Generate an answer based on retrieved summaries using DistilBERT.""" financial_keywords = ["open", "close", "stock", "price", "volume", "trading"] if not any(keyword in query_text.lower() for keyword in financial_keywords): return "Guardrail Triggered: Please ask a financial question.", 0.0 context_df = adaptive_retrieve(query_text, top_k=top_k, alpha=alpha) context_text = " ".join(context_df["Summary"].tolist()) # Compute cosine similarity for confidence score query_embedding = embedding_model.encode([query_text], convert_to_numpy=True).astype('float32') retrieved_embeddings = embedding_model.encode(context_df["Summary"].tolist(), convert_to_numpy=True) similarity_scores = cosine_similarity(query_embedding, retrieved_embeddings).flatten() confidence = round(np.mean(similarity_scores), 2) # Prepare input for DistilBERT inputs = distilbert_tokenizer(query_text + " " + context_text, return_tensors="pt", padding=True, truncation=True) outputs = distilbert_model(**inputs) logits = outputs.logits.detach().numpy() # Generate answer answer = " ".join(context_df["Summary"].tolist())[:1000] # Limit output length return answer, confidence # ============================================================================= # 📌 5. UI Development using Gradio # ============================================================================= iface = gr.Interface( fn=generate_response, inputs=gr.Textbox(lines=2, placeholder="Enter your financial question here..."), outputs=[gr.Textbox(label="Answer"), gr.Textbox(label="Confidence Score")], title="Financial RAG Model Interface", description="Ask questions based on the company's financial summaries." ) # Launch the Gradio interface iface.launch() # ============================================================================= # 📌 6. Testing & Validation # ============================================================================= def print_test_results(query_text, top_k=3, alpha=0.5): answer, confidence = generate_response(query_text, top_k, alpha) print(f"Question: {query_text}\nAnswer: {answer}\nConfidence Score: {confidence}\n{'-'*50}") # Test Cases test_queries = [ "What year had the lowest stock prices?", "How did the trading volume change over the years?", "What is the capital of France?" ] for q in test_queries: print_test_results(q)