uditk99's picture
Upload app.py
739895a verified
# -*- coding: utf-8 -*-
"""app.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1BmH6jAmykO3k3aZv-Cjz-TWvDrDzrB10
"""
# =============================================================================
# Imports & Setup
# =============================================================================
import os
import numpy as np
import pandas as pd
import faiss # For fast vector similarity search
from sentence_transformers import SentenceTransformer # For generating text embeddings
from rank_bm25 import BM25Okapi # For BM25 keyword-based retrieval
import spacy # For tokenization
from sklearn.metrics.pairwise import cosine_similarity # For computing cosine similarity
from sklearn.preprocessing import normalize # For normalizing BM25 scores
# For the Gradio UI
import gradio as gr
# For response generation using a small language model (we use FLAN-T5-Small)
from transformers import pipeline, set_seed
# Set a random seed for reproducibility
set_seed(42)
# Load SpaCy English model (make sure to download it with: python -m spacy download en_core_web_sm)
nlp = spacy.load("en_core_web_sm")
# =============================================================================
# 1. Data Collection & Preprocessing
# =============================================================================
# Load the CSV file containing financial data.
# (Make sure the CSV file "MSFT_1986-03-13_2025-02-04.csv" is in the "data" folder)
csv_file_path = r"MSFT_1986-03-13_2025-02-04.csv" # Adjust the path if necessary
# Load the CSV file into a DataFrame
df = pd.read_csv(csv_file_path)
# Display basic info about the dataset
print(df.info())
# Data Cleaning & Structuring
# Convert 'Date' column to datetime format
df['Date'] = pd.to_datetime(df['Date'])
# Sort data by Date
df = df.sort_values(by='Date')
# Extract Year from Date
df['Year'] = df['Date'].dt.year
# Aggregate data by Year to generate financial summaries
yearly_summary = df.groupby('Year').agg(
Open_Min=('Open', 'min'),
Open_Max=('Open', 'max'),
Close_Min=('Close', 'min'),
Close_Max=('Close', 'max'),
Avg_Volume=('Volume', 'mean')
).reset_index()
# Create a textual summary for each year
yearly_summary['Summary'] = yearly_summary.apply(
lambda row: f"In {row['Year']}, the stock opened between ${row['Open_Min']:.2f} and ${row['Open_Max']:.2f}, "
f"while closing between ${row['Close_Min']:.2f} and ${row['Close_Max']:.2f}. "
f"The average trading volume was {row['Avg_Volume']:,.0f} shares.",
axis=1
)
# Display the cleaned and structured data
print(yearly_summary.head()) # Use this for terminal/console
# yearly_summary.head() # Use this in Jupyter Notebook
# =============================================================================
# 2. Basic RAG Implementation
# =============================================================================
# Convert financial summaries into text chunks and generate vector embeddings.
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
# Convert yearly financial summaries into vector embeddings
summary_texts = yearly_summary["Summary"].tolist() # Extract summaries as text
summary_embeddings = embedding_model.encode(summary_texts, convert_to_numpy=True) # Generate embeddings
# Store embeddings as a NumPy array for further processing
summary_embeddings.shape # This should be (num_years, embedding_size)
# Define the dimension of embeddings (384 from MiniLM model)
embedding_dim = 384
# Create a FAISS index (Flat index for now, can be optimized later)
faiss_index = faiss.IndexFlatL2(embedding_dim)
# Convert embeddings to float32 (FAISS requires this format)
summary_embeddings = summary_embeddings.astype('float32')
# Add embeddings to the FAISS index
faiss_index.add(summary_embeddings)
# Store the year information for retrieval
year_map = {i: yearly_summary["Year"].iloc[i] for i in range(len(yearly_summary))}
# Verify that embeddings are stored successfully
faiss_index.ntotal
# =============================================================================
# 3. Advanced RAG Implementation
# =============================================================================
# 3.1: BM25 for Keyword-Based Search
# Tokenize each summary using SpaCy (tokens are converted to lowercase).
tokenized_summaries = [[token.text.lower() for token in nlp(summary)] for summary in summary_texts]
# Build the BM25 index.
bm25 = BM25Okapi(tokenized_summaries)
# 3.2: Define Retrieval Functions
def retrieve_similar_summaries(query_text, top_k=3):
"""
Retrieve similar financial summaries using FAISS vector search.
"""
query_embedding = embedding_model.encode([query_text], convert_to_numpy=True).astype('float32')
distances, indices = faiss_index.search(query_embedding, top_k)
results = []
for idx in indices[0]:
results.append((year_map[idx], yearly_summary.iloc[idx]["Summary"]))
return pd.DataFrame(results, columns=["Year", "Summary"])
def hybrid_retrieve(query_text, top_k=3, alpha=0.5):
"""
Hybrid retrieval combining FAISS (vector search) and BM25 (keyword search).
Scores are combined using the weighting factor 'alpha'.
"""
query_embedding = embedding_model.encode([query_text], convert_to_numpy=True).astype('float32')
_, faiss_indices = faiss_index.search(query_embedding, top_k)
bm25_scores = bm25.get_scores([token.text.lower() for token in nlp(query_text)])
bm25_top_indices = np.argsort(bm25_scores)[::-1][:top_k]
combined_scores = {}
for rank, idx in enumerate(faiss_indices[0]):
combined_scores[idx] = alpha * (top_k - rank)
bm25_norm_scores = normalize([bm25_scores])[0]
for rank, idx in enumerate(bm25_top_indices):
if idx in combined_scores:
combined_scores[idx] += (1 - alpha) * (top_k - rank)
else:
combined_scores[idx] = (1 - alpha) * (top_k - rank)
sorted_results = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)
results = [(year_map[idx], yearly_summary.iloc[idx]["Summary"]) for idx, _ in sorted_results]
return pd.DataFrame(results, columns=["Year", "Summary"])
def adaptive_retrieve(query_text, top_k=3, alpha=0.5):
"""
Adaptive retrieval re-ranks results by combining FAISS and BM25 scores.
"""
query_embedding = embedding_model.encode([query_text], convert_to_numpy=True).astype('float32')
_, faiss_indices = faiss_index.search(query_embedding, top_k)
query_tokens = [token.text.lower() for token in nlp(query_text)]
bm25_scores = bm25.get_scores(query_tokens)
bm25_top_indices = np.argsort(bm25_scores)[::-1][:top_k]
faiss_scores = np.linspace(1, 0, num=top_k)
bm25_norm_scores = normalize([bm25_scores])[0]
combined_scores = {}
for rank, idx in enumerate(faiss_indices[0]):
combined_scores[idx] = alpha * faiss_scores[rank]
for idx in bm25_top_indices:
if idx in combined_scores:
combined_scores[idx] += (1 - alpha) * bm25_norm_scores[idx]
else:
combined_scores[idx] = (1 - alpha) * bm25_norm_scores[idx]
sorted_results = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)
results = [(year_map[idx], yearly_summary.iloc[idx]["Summary"]) for idx, _ in sorted_results]
return pd.DataFrame(results, columns=["Year", "Summary"])
def merge_similar_chunks(threshold=0.95):
"""
Chunk Merging: Merge similar financial summaries based on cosine similarity.
This reduces redundancy when multiple chunks are very similar.
"""
merged_summaries = []
used_indices = set()
for i in range(len(summary_embeddings)):
if i in used_indices:
continue
similarities = cosine_similarity([summary_embeddings[i]], summary_embeddings)[0]
similar_indices = np.where(similarities >= threshold)[0]
merged_text = " ".join(yearly_summary.iloc[idx]["Summary"] for idx in similar_indices)
merged_summaries.append((yearly_summary.iloc[i]["Year"], merged_text))
used_indices.update(similar_indices)
return pd.DataFrame(merged_summaries, columns=["Year", "Merged Summary"])
# Optional: Check merged summaries for debugging.
merged_summary_df = merge_similar_chunks(threshold=0.95)
print("Merged summaries shape:", merged_summary_df.shape)
merged_summary_df.head()
# =============================================================================
# 4. UI Development using Gradio (Updated for newer API)
# =============================================================================
def generate_response(query_text, top_k=3, alpha=0.5):
"""
Generate an answer for a financial query by:
- Validating the query with an input-side guardrail.
- Retrieving context using adaptive retrieval.
- Generating a refined answer using FLAN-T5-Small.
Returns:
answer (str): The generated answer.
confidence (float): A mock confidence score based on BM25 scores.
"""
# -----------------------------------------------------------------------------
# Guard Rail Implementation (Input-Side)
# -----------------------------------------------------------------------------
financial_keywords = ["open", "close", "stock", "price", "volume", "trading"]
if not any(keyword in query_text.lower() for keyword in financial_keywords):
return ("Guardrail Triggered: Your query does not appear to be related to financial data. Please ask a financial question."), 0.0
# Retrieve context using adaptive retrieval.
context_df = adaptive_retrieve(query_text, top_k=top_k, alpha=alpha)
context_text = " ".join(context_df["Summary"].tolist())
# Adjust the prompt to provide clear instructions.
prompt = f"Given the following financial data:\n{context_text}\nAnswer this question: {query_text}."
# Use FLAN-T5-Small for text generation via the text2text-generation pipeline.
# Increase max_length to allow longer answers.
generator = pipeline('text2text-generation', model='google/flan-t5-small')
generated = generator(prompt, max_length=200, num_return_sequences=1)
answer = generated[0]['generated_text'].replace(prompt, "").strip()
# Fallback message if answer is empty.
if not answer:
answer = "I'm sorry, I couldn't generate a clear answer. Please try rephrasing your question."
# Compute a mock confidence score using normalized BM25 scores.
query_tokens = [token.text.lower() for token in nlp(query_text)]
bm25_scores = bm25.get_scores(query_tokens)
max_score = np.max(bm25_scores) if np.max(bm25_scores) > 0 else 1
confidence = round(np.mean(bm25_scores) / max_score, 2)
return answer, confidence
# Create the Gradio interface using the new API.
iface = gr.Interface(
fn=generate_response,
inputs=gr.Textbox(lines=2, placeholder="Enter your financial question here..."),
outputs=[gr.Textbox(label="Answer"), gr.Textbox(label="Confidence Score")],
title="Financial RAG Model Interface",
description=("Ask questions based on the company's financial summaries "
)
)
# Launch the Gradio interface.
iface.launch()
# =============================================================================
# 6. Testing & Validation (Updated)
# =============================================================================
def print_test_results(query_text, top_k=3, alpha=0.5):
answer, confidence = generate_response(query_text, top_k, alpha)
print("Question: ", query_text)
print("Answer: ", answer)
print("Confidence Score: ", confidence)
print("-" * 50)
# Test 1: High-confidence financial query.
query_high = "What year had the lowest stock prices?"
print_test_results(query_high)
# Test 2: Low-confidence financial query.
query_low = "How did the trading volume vary?"
print_test_results(query_low)
# Test 3: Irrelevant query (should trigger guardrail).
query_irrelevant = "What is the capital of France?"
print_test_results(query_irrelevant)