# -*- coding: utf-8 -*- """app.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1BmH6jAmykO3k3aZv-Cjz-TWvDrDzrB10 """ # ============================================================================= # Imports & Setup # ============================================================================= import os import numpy as np import pandas as pd import faiss # For fast vector similarity search from sentence_transformers import SentenceTransformer # For generating text embeddings from rank_bm25 import BM25Okapi # For BM25 keyword-based retrieval import spacy # For tokenization from sklearn.metrics.pairwise import cosine_similarity # For computing cosine similarity from sklearn.preprocessing import normalize # For normalizing BM25 scores # For the Gradio UI import gradio as gr # For response generation using a small language model (we use FLAN-T5-Small) from transformers import pipeline, set_seed # Set a random seed for reproducibility set_seed(42) # Load SpaCy English model (make sure to download it with: python -m spacy download en_core_web_sm) nlp = spacy.load("en_core_web_sm") # ============================================================================= # 1. Data Collection & Preprocessing # ============================================================================= # Load the CSV file containing financial data. # (Make sure the CSV file "MSFT_1986-03-13_2025-02-04.csv" is in the "data" folder) csv_file_path = r"MSFT_1986-03-13_2025-02-04.csv" # Adjust the path if necessary # Load the CSV file into a DataFrame df = pd.read_csv(csv_file_path) # Display basic info about the dataset print(df.info()) # Data Cleaning & Structuring # Convert 'Date' column to datetime format df['Date'] = pd.to_datetime(df['Date']) # Sort data by Date df = df.sort_values(by='Date') # Extract Year from Date df['Year'] = df['Date'].dt.year # Aggregate data by Year to generate financial summaries yearly_summary = df.groupby('Year').agg( Open_Min=('Open', 'min'), Open_Max=('Open', 'max'), Close_Min=('Close', 'min'), Close_Max=('Close', 'max'), Avg_Volume=('Volume', 'mean') ).reset_index() # Create a textual summary for each year yearly_summary['Summary'] = yearly_summary.apply( lambda row: f"In {row['Year']}, the stock opened between ${row['Open_Min']:.2f} and ${row['Open_Max']:.2f}, " f"while closing between ${row['Close_Min']:.2f} and ${row['Close_Max']:.2f}. " f"The average trading volume was {row['Avg_Volume']:,.0f} shares.", axis=1 ) # Display the cleaned and structured data print(yearly_summary.head()) # Use this for terminal/console # yearly_summary.head() # Use this in Jupyter Notebook # ============================================================================= # 2. Basic RAG Implementation # ============================================================================= # Convert financial summaries into text chunks and generate vector embeddings. embedding_model = SentenceTransformer("all-MiniLM-L6-v2") # Convert yearly financial summaries into vector embeddings summary_texts = yearly_summary["Summary"].tolist() # Extract summaries as text summary_embeddings = embedding_model.encode(summary_texts, convert_to_numpy=True) # Generate embeddings # Store embeddings as a NumPy array for further processing summary_embeddings.shape # This should be (num_years, embedding_size) # Define the dimension of embeddings (384 from MiniLM model) embedding_dim = 384 # Create a FAISS index (Flat index for now, can be optimized later) faiss_index = faiss.IndexFlatL2(embedding_dim) # Convert embeddings to float32 (FAISS requires this format) summary_embeddings = summary_embeddings.astype('float32') # Add embeddings to the FAISS index faiss_index.add(summary_embeddings) # Store the year information for retrieval year_map = {i: yearly_summary["Year"].iloc[i] for i in range(len(yearly_summary))} # Verify that embeddings are stored successfully faiss_index.ntotal # ============================================================================= # 3. Advanced RAG Implementation # ============================================================================= # 3.1: BM25 for Keyword-Based Search # Tokenize each summary using SpaCy (tokens are converted to lowercase). tokenized_summaries = [[token.text.lower() for token in nlp(summary)] for summary in summary_texts] # Build the BM25 index. bm25 = BM25Okapi(tokenized_summaries) # 3.2: Define Retrieval Functions def retrieve_similar_summaries(query_text, top_k=3): """ Retrieve similar financial summaries using FAISS vector search. """ query_embedding = embedding_model.encode([query_text], convert_to_numpy=True).astype('float32') distances, indices = faiss_index.search(query_embedding, top_k) results = [] for idx in indices[0]: results.append((year_map[idx], yearly_summary.iloc[idx]["Summary"])) return pd.DataFrame(results, columns=["Year", "Summary"]) def hybrid_retrieve(query_text, top_k=3, alpha=0.5): """ Hybrid retrieval combining FAISS (vector search) and BM25 (keyword search). Scores are combined using the weighting factor 'alpha'. """ query_embedding = embedding_model.encode([query_text], convert_to_numpy=True).astype('float32') _, faiss_indices = faiss_index.search(query_embedding, top_k) bm25_scores = bm25.get_scores([token.text.lower() for token in nlp(query_text)]) bm25_top_indices = np.argsort(bm25_scores)[::-1][:top_k] combined_scores = {} for rank, idx in enumerate(faiss_indices[0]): combined_scores[idx] = alpha * (top_k - rank) bm25_norm_scores = normalize([bm25_scores])[0] for rank, idx in enumerate(bm25_top_indices): if idx in combined_scores: combined_scores[idx] += (1 - alpha) * (top_k - rank) else: combined_scores[idx] = (1 - alpha) * (top_k - rank) sorted_results = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True) results = [(year_map[idx], yearly_summary.iloc[idx]["Summary"]) for idx, _ in sorted_results] return pd.DataFrame(results, columns=["Year", "Summary"]) def adaptive_retrieve(query_text, top_k=3, alpha=0.5): """ Adaptive retrieval re-ranks results by combining FAISS and BM25 scores. """ query_embedding = embedding_model.encode([query_text], convert_to_numpy=True).astype('float32') _, faiss_indices = faiss_index.search(query_embedding, top_k) query_tokens = [token.text.lower() for token in nlp(query_text)] bm25_scores = bm25.get_scores(query_tokens) bm25_top_indices = np.argsort(bm25_scores)[::-1][:top_k] faiss_scores = np.linspace(1, 0, num=top_k) bm25_norm_scores = normalize([bm25_scores])[0] combined_scores = {} for rank, idx in enumerate(faiss_indices[0]): combined_scores[idx] = alpha * faiss_scores[rank] for idx in bm25_top_indices: if idx in combined_scores: combined_scores[idx] += (1 - alpha) * bm25_norm_scores[idx] else: combined_scores[idx] = (1 - alpha) * bm25_norm_scores[idx] sorted_results = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True) results = [(year_map[idx], yearly_summary.iloc[idx]["Summary"]) for idx, _ in sorted_results] return pd.DataFrame(results, columns=["Year", "Summary"]) def merge_similar_chunks(threshold=0.95): """ Chunk Merging: Merge similar financial summaries based on cosine similarity. This reduces redundancy when multiple chunks are very similar. """ merged_summaries = [] used_indices = set() for i in range(len(summary_embeddings)): if i in used_indices: continue similarities = cosine_similarity([summary_embeddings[i]], summary_embeddings)[0] similar_indices = np.where(similarities >= threshold)[0] merged_text = " ".join(yearly_summary.iloc[idx]["Summary"] for idx in similar_indices) merged_summaries.append((yearly_summary.iloc[i]["Year"], merged_text)) used_indices.update(similar_indices) return pd.DataFrame(merged_summaries, columns=["Year", "Merged Summary"]) # Optional: Check merged summaries for debugging. merged_summary_df = merge_similar_chunks(threshold=0.95) print("Merged summaries shape:", merged_summary_df.shape) merged_summary_df.head() # ============================================================================= # 4. UI Development using Gradio (Updated for newer API) # ============================================================================= def generate_response(query_text, top_k=3, alpha=0.5): """ Generate an answer for a financial query by: - Validating the query with an input-side guardrail. - Retrieving context using adaptive retrieval. - Generating a refined answer using FLAN-T5-Small. Returns: answer (str): The generated answer. confidence (float): A mock confidence score based on BM25 scores. """ # ----------------------------------------------------------------------------- # Guard Rail Implementation (Input-Side) # ----------------------------------------------------------------------------- financial_keywords = ["open", "close", "stock", "price", "volume", "trading"] if not any(keyword in query_text.lower() for keyword in financial_keywords): return ("Guardrail Triggered: Your query does not appear to be related to financial data. Please ask a financial question."), 0.0 # Retrieve context using adaptive retrieval. context_df = adaptive_retrieve(query_text, top_k=top_k, alpha=alpha) context_text = " ".join(context_df["Summary"].tolist()) # Adjust the prompt to provide clear instructions. prompt = f"Given the following financial data:\n{context_text}\nAnswer this question: {query_text}." # Use FLAN-T5-Small for text generation via the text2text-generation pipeline. # Increase max_length to allow longer answers. generator = pipeline('text2text-generation', model='google/flan-t5-small') generated = generator(prompt, max_length=200, num_return_sequences=1) answer = generated[0]['generated_text'].replace(prompt, "").strip() # Fallback message if answer is empty. if not answer: answer = "I'm sorry, I couldn't generate a clear answer. Please try rephrasing your question." # Compute a mock confidence score using normalized BM25 scores. query_tokens = [token.text.lower() for token in nlp(query_text)] bm25_scores = bm25.get_scores(query_tokens) max_score = np.max(bm25_scores) if np.max(bm25_scores) > 0 else 1 confidence = round(np.mean(bm25_scores) / max_score, 2) return answer, confidence # Create the Gradio interface using the new API. iface = gr.Interface( fn=generate_response, inputs=gr.Textbox(lines=2, placeholder="Enter your financial question here..."), outputs=[gr.Textbox(label="Answer"), gr.Textbox(label="Confidence Score")], title="Financial RAG Model Interface", description=("Ask questions based on the company's financial summaries " ) ) # Launch the Gradio interface. iface.launch() # ============================================================================= # 6. Testing & Validation (Updated) # ============================================================================= def print_test_results(query_text, top_k=3, alpha=0.5): answer, confidence = generate_response(query_text, top_k, alpha) print("Question: ", query_text) print("Answer: ", answer) print("Confidence Score: ", confidence) print("-" * 50) # Test 1: High-confidence financial query. query_high = "What year had the lowest stock prices?" print_test_results(query_high) # Test 2: Low-confidence financial query. query_low = "How did the trading volume vary?" print_test_results(query_low) # Test 3: Irrelevant query (should trigger guardrail). query_irrelevant = "What is the capital of France?" print_test_results(query_irrelevant)