import os import sys import subprocess import re import multiprocessing import atexit from collections.abc import Iterator import gradio as gr import gradio.themes as themes from huggingface_hub import hf_hub_download, login import logging import pandas as pd import torch # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Install llama-cpp-python with appropriate backend try: from llama_cpp import Llama except ModuleNotFoundError: if torch.cuda.is_available(): logger.info("Installing llama-cpp-python with CUDA support.") os.environ['CMAKE_ARGS'] = "-DLLAMA_CUDA=ON" subprocess.check_call([sys.executable, "-m", "pip", "install", "llama-cpp-python", "--force-reinstall", "--upgrade", "--no-cache-dir"]) else: logger.info("Installing llama-cpp-python without additional flags.") subprocess.check_call([sys.executable, "-m", "pip", "install", "llama-cpp-python", "--force-reinstall", "--upgrade", "--no-cache-dir"]) from llama_cpp import Llama # Install yfinance if not present (for CAGR calculations) try: import yfinance as yf except ModuleNotFoundError: subprocess.check_call([sys.executable, "-m", "pip", "install", "yfinance"]) import yfinance as yf # Import pandas for handling DataFrame column structures import pandas as pd # Additional imports for visualization and file handling try: import matplotlib.pyplot as plt from PIL import Image import io except ModuleNotFoundError: subprocess.check_call([sys.executable, "-m", "pip", "install", "matplotlib", "pillow"]) import matplotlib.pyplot as plt from PIL import Image import io MAX_MAX_NEW_TOKENS = 512 DEFAULT_MAX_NEW_TOKENS = 400 MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "1024")) DESCRIPTION = """ # FinChat: Investing Q&A (Optimized for Speed) This application delivers an interactive chat interface powered by a highly efficient, small AI model adapted for addressing investing and finance inquiries through specialized prompt engineering. It ensures rapid, reasoned responses to user queries. Duplicate this Space for customization or queue-free deployment.
Running on CPU or GPU if available. Using Phi-2 model for faster inference. Inference is heavily optimized for responses in under 10 seconds for simple queries, with output limited to 250 tokens maximum. For longer responses, increase 'Max New Tokens' in Advanced Settings. Brief delays may occur in free-tier environments due to shared resources, but typical generation speeds are improved with the smaller model.
""" LICENSE = """ --- This application employs the Phi-2 model, governed by Microsoft's Terms of Use. Refer to the [model card](https://huggingface.co/TheBloke/phi-2-GGUF) for details. """ # Load the model (skip fine-tuning for faster startup) try: model_path = hf_hub_download( repo_id="TheBloke/phi-2-GGUF", filename="phi-2.Q4_K_M.gguf" ) n_gpu_layers = -1 if torch.cuda.is_available() else 0 llm = Llama( model_path=model_path, n_ctx=1024, n_batch=1024, # Increased for faster processing n_threads=multiprocessing.cpu_count(), n_gpu_layers=n_gpu_layers, chat_format="chatml" # Phi-2 uses ChatML format in llama.cpp ) logger.info(f"Model loaded successfully with n_gpu_layers={n_gpu_layers}.") # Warm up the model for faster initial inference llm("Warm-up prompt", max_tokens=1, echo=False) logger.info("Model warm-up completed.") except Exception as e: logger.error(f"Error loading model: {str(e)}") raise # Register explicit close for llm to avoid destructor error atexit.register(llm.close) DEFAULT_SYSTEM_PROMPT = """You are FinChat, a knowledgeable AI assistant specializing in investing and finance. Provide accurate, helpful, reasoned, and concise answers to investing questions. Always base responses on reliable information and advise users to consult professionals for personalized advice. Always respond exclusively in English. Use bullet points for clarity. Example: User: average return for TSLA between 2010 and 2020 Assistant: - TSLA CAGR (2010-2020): ~63.01% - Represents average annual return with compounding - Past performance not indicative of future results - Consult a financial advisor""" def generate( message: str, chat_history: list[dict], system_prompt: str = DEFAULT_SYSTEM_PROMPT, max_new_tokens: int = DEFAULT_MAX_NEW_TOKENS, temperature: float = 0.6, top_p: float = 0.9, top_k: int = 50, repetition_penalty: float = 1.2, ) -> Iterator[str]: logger.info(f"Generating response for message: {message}") lower_message = message.lower().strip() if lower_message in ["hi", "hello"]: response = "I'm FinChat, your financial advisor. Ask me anything finance-related!" logger.info("Quick response for 'hi'/'hello' generated.") yield response return if "what is cagr" in lower_message: response = """- CAGR stands for Compound Annual Growth Rate. - It measures the mean annual growth rate of an investment over a specified period longer than one year, accounting for compounding. - Formula: CAGR = (Ending Value / Beginning Value)^(1 / Number of Years) - 1 - Useful for comparing investments over time. - Past performance not indicative of future results. Consult a financial advisor.""" logger.info("Quick response for 'what is cagr' generated.") yield response return # Check for CAGR/average return queries match = re.match(r'(?:average return|cagr) for ([\w\s,]+(?:and [\w\s,]+)?) between (\d{4}) and (\d{4})', lower_message) if match: tickers_str, start_year, end_year = match.groups() tickers = [t.strip().upper() for t in re.split(r',|\band\b', tickers_str) if t.strip()] responses = [] if int(end_year) <= int(start_year): yield "The specified time period is invalid (end year must be after start year)." return for ticker in tickers: try: # Download data with adjusted close prices data = yf.download(ticker, start=f"{start_year}-01-01", end=f"{end_year}-12-31", progress=False) if not data.empty: # Check if 'Adj Close' column exists if 'Adj Close' not in data.columns: responses.append(f"- {ticker}: Error - Adjusted Close price data not available.") logger.error(f"No 'Adj Close' column for {ticker}.") continue # Ensure data is not MultiIndex for single ticker initial = data['Adj Close'].iloc[0] final = data['Adj Close'].iloc[-1] start_date = data.index[0] end_date = data.index[-1] days = (end_date - start_date).days years = days / 365.25 if years > 0 and pd.notna(initial) and pd.notna(final): cagr = ((final / initial) ** (1 / years) - 1) * 100 responses.append(f"- {ticker}: ~{cagr:.2f}%") else: responses.append(f"- {ticker}: Invalid period or missing price data.") else: responses.append(f"- {ticker}: No historical data available between {start_year} and {end_year}.") except Exception as e: logger.error(f"Error calculating CAGR for {ticker}: {str(e)}") responses.append(f"- {ticker}: Error calculating CAGR - {str(e)}") full_response = f"CAGR for the requested stocks from {start_year} to {end_year}:\n" + "\n".join(responses) + "\n- Represents average annual returns with compounding\n- Past performance not indicative of future results\n- Consult a financial advisor" full_response = re.sub(r'<\|(?:im_start|im_end|system|user|assistant)\|>||\[END\]', '', full_response).strip() # Clean any trailing tokens # Estimate token count to ensure response fits within max_new_tokens response_tokens = len(llm.tokenize(full_response.encode("utf-8"), add_bos=False)) if response_tokens > max_new_tokens: logger.warning(f"CAGR response tokens ({response_tokens}) exceed max_new_tokens ({max_new_tokens}). Truncating to first complete sentence.") sentence_endings = ['.', '!', '?'] first_sentence_end = min([full_response.find(ending) + 1 for ending in sentence_endings if full_response.find(ending) != -1], default=len(full_response)) full_response = full_response[:first_sentence_end] if first_sentence_end > 0 else "Response truncated due to length; please increase Max New Tokens." logger.info("CAGR response generated.") yield full_response return # Build conversation messages (limit history to last 3 for speed) conversation = [{"role": "system", "content": system_prompt}] for msg in chat_history[-3:]: # Reduced from 5 to 3 for faster processing if msg["role"] == "user": conversation.append({"role": "user", "content": msg["content"]}) elif msg["role"] == "assistant": conversation.append({"role": "assistant", "content": msg["content"]}) conversation.append({"role": "user", "content": message}) # Approximate token length check and truncate if necessary prompt_text = "\n".join(d["content"] for d in conversation) input_tokens = llm.tokenize(prompt_text.encode("utf-8"), add_bos=False) while len(input_tokens) > MAX_INPUT_TOKEN_LENGTH: logger.warning(f"Input tokens ({len(input_tokens)}) exceed limit ({MAX_INPUT_TOKEN_LENGTH}). Truncating history.") if len(conversation) > 2: # Preserve system prompt and current user message conversation.pop(1) # Remove oldest user/assistant pair prompt_text = "\n".join(d["content"] for d in conversation) input_tokens = llm.tokenize(prompt_text.encode("utf-8"), add_bos=False) else: yield "Error: Input is too long even after truncation. Please shorten your query." return # Generate response with sentence boundary checking and token cleanup try: response = "" sentence_buffer = "" token_count = 0 stream = llm.create_chat_completion( messages=conversation, max_tokens=max_new_tokens, temperature=temperature, top_p=top_p, top_k=top_k, repeat_penalty=repetition_penalty, stream=True ) sentence_endings = ['.', '!', '?'] for chunk in stream: delta = chunk["choices"][0]["delta"] if "content" in delta and delta["content"] is not None: # Clean the chunk by removing ChatML tokens or similar cleaned_chunk = re.sub(r'<\|(?:im_start|im_end|system|user|assistant)\|>||\[END\]', '', delta["content"]) if not cleaned_chunk: continue sentence_buffer += cleaned_chunk response += cleaned_chunk # Approximate token count for the chunk chunk_tokens = len(llm.tokenize(cleaned_chunk.encode("utf-8"), add_bos=False)) token_count += chunk_tokens # Check for sentence boundary if any(sentence_buffer.strip().endswith(ending) for ending in sentence_endings): yield response sentence_buffer = "" # Clear buffer after yielding a complete sentence # If approaching token limit, yield the last complete sentence if token_count >= max_new_tokens - 10: # Buffer to avoid mid-word truncation last_sentence_end = max([response.rfind(ending) for ending in sentence_endings if response.rfind(ending) != -1], default=-1) if last_sentence_end != -1: response = response[:last_sentence_end + 1] yield response break else: # If no sentence boundary, yield the first sentence or a fallback first_sentence_end = min([response.find(ending) + 1 for ending in sentence_endings if response.find(ending) != -1], default=len(response)) response = response[:first_sentence_end] if first_sentence_end > 0 else "Response truncated due to length; please increase Max New Tokens." yield response break if chunk["choices"][0]["finish_reason"] is not None: # Yield any remaining complete sentence in the buffer if sentence_buffer.strip(): last_sentence_end = max([sentence_buffer.rfind(ending) for ending in sentence_endings if sentence_buffer.rfind(ending) != -1], default=-1) if last_sentence_end != -1: response = response[:response.rfind(sentence_buffer) + last_sentence_end + 1] yield response else: yield response else: yield response break logger.info("Response generation completed.") except ValueError as ve: if "exceed context window" in str(ve): yield "Error: Prompt too long for context window. Please try a shorter query or clear history." else: logger.error(f"Error during response generation: {str(ve)}") yield f"Error generating response: {str(ve)}" except Exception as e: logger.error(f"Error during response generation: {str(e)}") yield f"Error generating response: {str(e)}" def process_portfolio(df, growth_rate): if df is None or len(df) == 0: return "", None # Convert to DataFrame if needed if not isinstance(df, pd.DataFrame): df = pd.DataFrame(df, columns=["Ticker", "Shares", "Avg Cost", "Current Price"]) df = df.dropna(subset=["Ticker"]) portfolio = {} for _, row in df.iterrows(): ticker = row["Ticker"].upper() if pd.notna(row["Ticker"]) else None if not ticker: continue shares = float(row["Shares"]) if pd.notna(row["Shares"]) else 0 cost = float(row["Avg Cost"]) if pd.notna(row["Avg Cost"]) else 0 price = float(row["Current Price"]) if pd.notna(row["Current Price"]) else 0 value = shares * price portfolio[ticker] = {'shares': shares, 'cost': cost, 'price': price, 'value': value} if not portfolio: return "", None total_value_now = sum(v['value'] for v in portfolio.values()) allocations = {k: v['value'] / total_value_now for k, v in portfolio.items()} if total_value_now > 0 else {} fig_alloc, ax_alloc = plt.subplots() ax_alloc.pie(allocations.values(), labels=allocations.keys(), autopct='%1.1f%%') ax_alloc.set_title('Portfolio Allocation') buf_alloc = io.BytesIO() fig_alloc.savefig(buf_alloc, format='png') buf_alloc.seek(0) chart_alloc = Image.open(buf_alloc) plt.close(fig_alloc) # Close the figure to free memory def project_value(value, years, rate): return value * (1 + rate / 100) ** years total_value_1yr = sum(project_value(v['value'], 1, growth_rate) for v in portfolio.values()) total_value_2yr = sum(project_value(v['value'], 2, growth_rate) for v in portfolio.values()) total_value_5yr = sum(project_value(v['value'], 5, growth_rate) for v in portfolio.values()) total_value_10yr = sum(project_value(v['value'], 10, growth_rate) for v in portfolio.values()) data_str = ( "User portfolio:\n" + "\n".join(f"- {k}: {v['shares']} shares, avg cost {v['cost']}, current price {v['price']}, value ${v['value']:,.2f}" for k, v in portfolio.items()) + f"\nTotal value now: ${total_value_now:,.2f}\nProjected (at {growth_rate}% annual growth):\n" f"- 1 year: ${total_value_1yr:,.2f}\n- 2 years: ${total_value_2yr:,.2f}\n- 5 years: ${total_value_5yr:,.2f}\n- 10 years: ${total_value_10yr:,.2f}" ) return data_str, chart_alloc def fetch_current_prices(df): if df is None or len(df) == 0: return df # Convert to DataFrame if needed if not isinstance(df, pd.DataFrame): df = pd.DataFrame(df, columns=["Ticker", "Shares", "Avg Cost", "Current Price"]) for i in df.index: ticker = df.at[i, "Ticker"] if pd.notna(ticker) and ticker.strip(): try: price = yf.Ticker(ticker.upper()).info.get('currentPrice', None) if price is not None: df.at[i, "Current Price"] = price except Exception as e: logger.warning(f"Failed to fetch price for {ticker}: {str(e)}") return df # Gradio interface setup with gr.Blocks(theme=themes.Soft(), css="""#chatbot {height: 800px; overflow: auto;}""") as demo: gr.Markdown(DESCRIPTION) chatbot = gr.Chatbot(label="FinChat", type="messages") msg = gr.Textbox(label="Ask a finance question", placeholder="e.g., 'What is CAGR?' or 'Average return for AAPL between 2010 and 2020'", info="Enter your query here. Portfolio data will be appended if provided.") with gr.Row(): submit = gr.Button("Submit", variant="primary") clear = gr.Button("Clear") gr.Examples( examples=["What is CAGR?", "Average return for AAPL between 2010 and 2020", "Hi", "Explain compound interest"], inputs=msg, label="Example Queries" ) with gr.Accordion("Enter Portfolio for Projections", open=False): portfolio_df = gr.Dataframe( headers=["Ticker", "Shares", "Avg Cost", "Current Price"], datatype=["str", "number", "number", "number"], row_count=3, col_count=(4, "fixed"), label="Portfolio Data", interactive=True ) gr.Markdown("Enter your stocks here. You can add more rows by editing the table.") fetch_button = gr.Button("Fetch Current Prices", variant="secondary") fetch_button.click(fetch_current_prices, inputs=portfolio_df, outputs=portfolio_df) growth_rate = gr.Slider(minimum=5, maximum=50, step=5, value=10, label="Annual Growth Rate (%)", interactive=True, info="Select the assumed annual growth rate for projections.") growth_rate_label = gr.Markdown("**Selected Growth Rate: 10%**") with gr.Accordion("Advanced Settings", open=False): system_prompt = gr.Textbox(label="System Prompt", value=DEFAULT_SYSTEM_PROMPT, lines=6, info="Customize the AI's system prompt.") temperature = gr.Slider(label="Temperature", value=0.6, minimum=0.0, maximum=1.0, step=0.05, info="Controls randomness: lower is more deterministic.") top_p = gr.Slider(label="Top P", value=0.9, minimum=0.0, maximum=1.0, step=0.05, info="Nucleus sampling: higher includes more diverse tokens.") top_k = gr.Slider(label="Top K", value=50, minimum=1, maximum=100, step=1, info="Top-K sampling: limits to top K tokens.") repetition_penalty = gr.Slider(label="Repetition Penalty", value=1.2, minimum=1.0, maximum=2.0, step=0.05, info="Penalizes repeated tokens.") max_new_tokens = gr.Slider(label="Max New Tokens", value=DEFAULT_MAX_NEW_TOKENS, minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, info="Maximum length of generated response.") gr.Markdown(LICENSE) def update_growth_rate_label(growth_rate): return f"**Selected Growth Rate: {growth_rate}%**" def user(message, history): if not message: return "", history return "", history + [{"role": "user", "content": message}] def bot(history, sys_prompt, temp, tp, tk, rp, mnt, portfolio_df, growth_rate): if not history: logger.warning("History is empty, initializing with user message.") history = [{"role": "user", "content": ""}] message = history[-1]["content"] portfolio_data, chart_alloc = process_portfolio(portfolio_df, growth_rate) message += "\n" + portfolio_data history[-1]["content"] = message history.append({"role": "assistant", "content": ""}) for new_text in generate(message, history[:-1], sys_prompt, mnt, temp, tp, tk, rp): history[-1]["content"] = new_text yield history, f"**Selected Growth Rate: {growth_rate}%**" if chart_alloc: history.append({"role": "assistant", "content": "", "image": chart_alloc}) yield history, f"**Selected Growth Rate: {growth_rate}%**" growth_rate.change(update_growth_rate_label, inputs=growth_rate, outputs=growth_rate_label) submit.click(user, [msg, chatbot], [msg, chatbot], queue=False).then( bot, [chatbot, system_prompt, temperature, top_p, top_k, repetition_penalty, max_new_tokens, portfolio_df, growth_rate], [chatbot, growth_rate_label] ) msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then( bot, [chatbot, system_prompt, temperature, top_p, top_k, repetition_penalty, max_new_tokens, portfolio_df, growth_rate], [chatbot, growth_rate_label] ) clear.click(lambda: [], None, chatbot, queue=False) demo.queue(max_size=128).launch()