import json
import os
import time
from pathlib import Path
import gradio as gr
from gradio import ChatMessage
from agents.financial_agent import FinancialAdvisorAgent
from agents.tools import FinancialTools
# Avatar configuration
AVATAR_IMAGES = (
None,
"./public/images/fin_logo.png",
)
# Initialize tools and agent
financial_tools = FinancialTools(tavily_api_key=os.getenv("TAVILY_API_KEY"))
tools = financial_tools.get_all_tools()
agent = FinancialAdvisorAgent(tools=tools, openai_api_key=os.getenv("OPENAI_API_KEY"))
gr.set_static_paths(paths=[(Path.cwd() / "public" / "images").absolute()])
def analyze_data_with_repl(data_type, data):
"""Analyze financial data using Python REPL with comprehensive insights"""
if data_type == "budget":
try:
budget_data = json.loads(data)
categories = list(budget_data.get("current_expenses", {}).keys())
values = list(budget_data.get("current_expenses", {}).values())
income = budget_data.get("monthly_income", budget_data.get("income", 0))
if categories and values:
total_expenses = sum(values)
analysis_text = "💰 **Comprehensive Budget Analysis**\n\n"
# Income vs Expenses Overview
analysis_text += "## 📈 **Income vs Expenses Overview**\n"
analysis_text += f"- **Monthly Income**: ${income:,.0f}\n"
analysis_text += f"- **Total Expenses**: ${total_expenses:,.0f}\n"
if income > 0:
remaining = income - total_expenses
savings_rate = (remaining / income * 100) if income > 0 else 0
if remaining > 0:
analysis_text += f"- **💚 Surplus**: ${remaining:,.0f}\n"
analysis_text += f"- **💎 Savings Rate**: {savings_rate:.1f}%\n"
else:
analysis_text += f"- **🔴 Deficit**: ${abs(remaining):,.0f}\n"
analysis_text += f"- **⚠️ Overspending**: {abs(savings_rate):.1f}%\n"
# Expense Breakdown with Progress Bars
analysis_text += "\n## 💳 **Expense Breakdown**\n"
for i, (category, amount) in enumerate(zip(categories, values)):
percentage = (amount / total_expenses * 100) if total_expenses > 0 else 0
income_percentage = (amount / income * 100) if income > 0 else 0
bar = "█" * min(int(percentage / 3), 30) # Max 30 chars
analysis_text += f"**{category.title()}**: ${amount:,.0f}\n"
analysis_text += f" └─ {percentage:.1f}% of expenses | {income_percentage:.1f}% of income {bar}\n\n"
# Financial Health Metrics
analysis_text += "## 📊 **Financial Health Metrics**\n"
avg_expense = total_expenses / len(values)
largest_expense = max(values)
smallest_expense = min(values)
largest_category = categories[values.index(largest_expense)]
smallest_category = categories[values.index(smallest_expense)]
analysis_text += f"- **Average Category Expense**: ${avg_expense:,.0f}\n"
analysis_text += f"- **Highest Expense**: {largest_category} (${largest_expense:,.0f})\n"
analysis_text += f"- **Lowest Expense**: {smallest_category} (${smallest_expense:,.0f})\n"
analysis_text += f"- **Expense Range**: ${largest_expense - smallest_expense:,.0f}\n"
# Budget Recommendations
analysis_text += "\n## 💡 **Smart Budget Insights**\n"
# 50/30/20 Rule Analysis
if income > 0:
needs_target = income * 0.50
wants_target = income * 0.30
savings_target = income * 0.20
analysis_text += f"**50/30/20 Rule Comparison:**\n"
analysis_text += f"- Needs Target (50%): ${needs_target:,.0f}\n"
analysis_text += f"- Wants Target (30%): ${wants_target:,.0f}\n"
analysis_text += f"- Savings Target (20%): ${savings_target:,.0f}\n"
if savings_rate >= 20:
analysis_text += "✅ **Excellent savings rate!**\n"
elif savings_rate >= 10:
analysis_text += "⚠️ **Good savings, aim for 20%**\n"
else:
analysis_text += "🔴 **Consider reducing expenses to save more**\n"
# Category Warnings
for category, amount in zip(categories, values):
if income > 0:
cat_percentage = (amount / income * 100)
if category.lower() in ['rent', 'housing'] and cat_percentage > 30:
analysis_text += f"⚠️ **Housing costs high**: {cat_percentage:.1f}% (recommend <30%)\n"
elif category.lower() in ['food', 'dining'] and cat_percentage > 15:
analysis_text += f"⚠️ **Food costs high**: {cat_percentage:.1f}% (recommend <15%)\n"
return analysis_text
except Exception as e:
return f"Error analyzing budget data: {str(e)}"
elif data_type == "portfolio":
try:
portfolio_data = json.loads(data)
holdings = portfolio_data.get("holdings", [])
total_value = sum(holding.get("value", 0) for holding in holdings)
analysis_text = "📊 **Advanced Portfolio Analysis**\n\n"
# Portfolio Overview
analysis_text += "## 💼 **Portfolio Overview**\n"
analysis_text += f"- **Total Portfolio Value**: ${total_value:,.2f}\n"
analysis_text += f"- **Number of Holdings**: {len(holdings)}\n"
if holdings:
values = [holding.get("value", 0) for holding in holdings]
avg_holding = sum(values) / len(values)
max_holding = max(values)
min_holding = min(values)
analysis_text += f"- **Average Holding Size**: ${avg_holding:,.2f}\n"
analysis_text += f"- **Largest Position**: ${max_holding:,.2f}\n"
analysis_text += f"- **Smallest Position**: ${min_holding:,.2f}\n"
# Detailed Holdings breakdown
analysis_text += "\n## 📈 **Holdings Breakdown**\n"
sorted_holdings = sorted(holdings, key=lambda x: x.get("value", 0), reverse=True)
for i, holding in enumerate(sorted_holdings, 1):
symbol = holding.get("symbol", "Unknown")
value = holding.get("value", 0)
shares = holding.get("shares", 0)
allocation = holding.get("allocation", (value/total_value*100) if total_value > 0 else 0)
sector = holding.get("sector", "Unknown")
# Calculate position concentration risk
risk_level = "🟢 Low" if allocation < 10 else "🟡 Medium" if allocation < 25 else "🔴 High"
analysis_text += f"**#{i} {symbol}** - {sector}\n"
analysis_text += f" └─ Value: ${value:,.2f} | Shares: {shares:,.0f} | Weight: {allocation:.1f}%\n"
analysis_text += f" └─ Concentration Risk: {risk_level}\n\n"
# Sector analysis with advanced metrics
sectors = {}
sector_values = {}
for holding in holdings:
sector = holding.get("sector", "Unknown")
allocation = holding.get("allocation", 0)
value = holding.get("value", 0)
sectors[sector] = sectors.get(sector, 0) + allocation
sector_values[sector] = sector_values.get(sector, 0) + value
if sectors:
analysis_text += "## 🏭 **Sector Diversification Analysis**\n"
sorted_sectors = sorted(sectors.items(), key=lambda x: x[1], reverse=True)
for sector, allocation in sorted_sectors:
bar = "█" * min(int(allocation / 2), 30)
value = sector_values.get(sector, 0)
# Sector concentration assessment
if allocation > 40:
risk_emoji = "🔴"
risk_text = "Over-concentrated"
elif allocation > 25:
risk_emoji = "🟡"
risk_text = "Moderate concentration"
else:
risk_emoji = "🟢"
risk_text = "Well diversified"
analysis_text += f"**{sector}**: {allocation:.1f}% (${value:,.2f}) {risk_emoji}\n"
analysis_text += f" └─ {bar} {risk_text}\n\n"
# Portfolio Health Metrics
analysis_text += "## 🎯 **Portfolio Health Assessment**\n"
# Diversification Score
num_sectors = len(sectors)
if num_sectors >= 8:
diversification = "🟢 Excellent"
elif num_sectors >= 5:
diversification = "🟡 Good"
else:
diversification = "🔴 Poor"
analysis_text += f"- **Sector Diversification**: {diversification} ({num_sectors} sectors)\n"
# Concentration Risk
if holdings:
top_3_allocation = sum(sorted([h.get("allocation", 0) for h in holdings], reverse=True)[:3])
if top_3_allocation > 60:
concentration_risk = "🔴 High"
elif top_3_allocation > 40:
concentration_risk = "🟡 Medium"
else:
concentration_risk = "🟢 Low"
analysis_text += f"- **Concentration Risk**: {concentration_risk} (Top 3: {top_3_allocation:.1f}%)\n"
# Portfolio Recommendations
analysis_text += "\n## 💡 **Portfolio Optimization Recommendations**\n"
# Check for over-concentration
for holding in holdings:
allocation = holding.get("allocation", 0)
if allocation > 25:
analysis_text += f"⚠️ **{holding.get('symbol', 'Unknown')}** is over-weighted at {allocation:.1f}% (consider rebalancing)\n"
# Sector recommendations
for sector, allocation in sectors.items():
if allocation > 40:
analysis_text += f"⚠️ **{sector}** sector over-weighted at {allocation:.1f}% (consider diversification)\n"
# Diversification suggestions
if num_sectors < 5:
analysis_text += "💡 **Consider adding exposure to more sectors for better diversification**\n"
if len(holdings) < 10:
analysis_text += "💡 **Consider adding more holdings to reduce single-stock risk**\n"
return analysis_text
except Exception as e:
return f"Error analyzing portfolio data: {str(e)}"
elif data_type == "stock":
try:
stock_data = json.loads(data)
symbol = stock_data.get("symbol", "Unknown")
price_str = stock_data.get("current_price", "0")
analysis_text = f"📈 **Comprehensive Stock Analysis: {symbol}**\n\n"
# Company Overview
analysis_text += "## 🏢 **Company Overview**\n"
analysis_text += f"- **Symbol**: {symbol}\n"
analysis_text += f"- **Current Price**: {price_str}\n"
analysis_text += f"- **Company**: {stock_data.get('company_name', 'N/A')}\n"
analysis_text += f"- **Sector**: {stock_data.get('sector', 'N/A')}\n"
analysis_text += f"- **Industry**: {stock_data.get('industry', 'N/A')}\n"
analysis_text += f"- **Market Cap**: {stock_data.get('market_cap', 'N/A')}\n\n"
# Financial Metrics
financials = stock_data.get("financials", {})
if financials:
analysis_text += "## 💹 **Key Financial Metrics**\n"
# Valuation metrics
pe_ratio = financials.get("pe_ratio", "N/A")
pb_ratio = financials.get("pb_ratio", "N/A")
ps_ratio = financials.get("ps_ratio", "N/A")
analysis_text += f"- **P/E Ratio**: {pe_ratio}"
if pe_ratio != "N/A" and isinstance(pe_ratio, (int, float)):
if pe_ratio < 15:
analysis_text += " 🟢 (Undervalued)"
elif pe_ratio > 25:
analysis_text += " 🔴 (Potentially Overvalued)"
else:
analysis_text += " 🟡 (Fairly Valued)"
analysis_text += "\n"
analysis_text += f"- **P/B Ratio**: {pb_ratio}\n"
analysis_text += f"- **P/S Ratio**: {ps_ratio}\n"
# Profitability metrics
analysis_text += f"- **ROE**: {financials.get('roe', 'N/A')}\n"
analysis_text += f"- **ROA**: {financials.get('roa', 'N/A')}\n"
analysis_text += f"- **Profit Margin**: {financials.get('profit_margin', 'N/A')}\n"
analysis_text += f"- **Revenue Growth**: {financials.get('revenue_growth', 'N/A')}\n\n"
# Performance analysis with trend indicators
performance = stock_data.get("performance", {})
if performance:
analysis_text += "## 📊 **Performance Analysis**\n"
periods = ["1d", "1w", "1m", "3m", "6m", "1y", "ytd"]
for period in periods:
if period in performance:
return_pct = performance[period]
# Add trend indicators
if isinstance(return_pct, str) and "%" in return_pct:
try:
pct_value = float(return_pct.replace("%", ""))
if pct_value > 0:
trend = "📈"
elif pct_value < 0:
trend = "📉"
else:
trend = "➡️"
except:
trend = ""
else:
trend = ""
analysis_text += f"- **{period.upper()}**: {return_pct} {trend}\n"
analysis_text += "\n"
# Advanced Risk Assessment
risk_data = stock_data.get("risk_assessment", {})
if risk_data:
analysis_text += "## ⚠️ **Risk Assessment**\n"
risk_level = risk_data.get('risk_level', 'N/A')
volatility = risk_data.get('volatility_30d', 'N/A')
beta = risk_data.get('beta', 'N/A')
# Risk level with emoji indicators
if risk_level.lower() == "low":
risk_emoji = "🟢"
elif risk_level.lower() == "medium":
risk_emoji = "🟡"
elif risk_level.lower() == "high":
risk_emoji = "🔴"
else:
risk_emoji = ""
analysis_text += f"- **Risk Level**: {risk_level} {risk_emoji}\n"
analysis_text += f"- **30-Day Volatility**: {volatility}\n"
analysis_text += f"- **Beta**: {beta}"
if beta != "N/A" and isinstance(beta, (int, float)):
if beta > 1.2:
analysis_text += " (High volatility vs market)"
elif beta < 0.8:
analysis_text += " (Low volatility vs market)"
else:
analysis_text += " (Similar to market)"
analysis_text += "\n\n"
# Technical Analysis
technical = stock_data.get("technical_analysis", {})
if technical:
analysis_text += "## 📈 **Technical Analysis**\n"
analysis_text += f"- **50-Day MA**: {technical.get('ma_50', 'N/A')}\n"
analysis_text += f"- **200-Day MA**: {technical.get('ma_200', 'N/A')}\n"
analysis_text += f"- **RSI**: {technical.get('rsi', 'N/A')}\n"
analysis_text += f"- **Support Level**: {technical.get('support', 'N/A')}\n"
analysis_text += f"- **Resistance Level**: {technical.get('resistance', 'N/A')}\n\n"
# Investment Recommendation with detailed reasoning
recommendation = stock_data.get("recommendation", {})
if recommendation:
action = recommendation.get('action', 'N/A')
confidence = recommendation.get('confidence', 'N/A')
reasoning = recommendation.get('reasoning', '')
analysis_text += "## 💡 **Investment Recommendation**\n"
# Action with emoji
if action.lower() == "buy":
action_emoji = "🟢"
elif action.lower() == "sell":
action_emoji = "🔴"
elif action.lower() == "hold":
action_emoji = "🟡"
else:
action_emoji = ""
analysis_text += f"- **Action**: {action} {action_emoji}\n"
analysis_text += f"- **Confidence**: {confidence}\n"
if reasoning:
analysis_text += f"- **Reasoning**: {reasoning}\n"
analysis_text += "\n"
# Additional Investment Considerations
analysis_text += "## 🎯 **Investment Considerations**\n"
# Dividend info
dividend_yield = stock_data.get("dividend_yield", "N/A")
if dividend_yield != "N/A":
analysis_text += f"- **Dividend Yield**: {dividend_yield}\n"
# Analyst ratings
analyst_rating = stock_data.get("analyst_rating", "N/A")
if analyst_rating != "N/A":
analysis_text += f"- **Analyst Rating**: {analyst_rating}\n"
# Price targets
price_target = stock_data.get("price_target", "N/A")
if price_target != "N/A":
analysis_text += f"- **Price Target**: {price_target}\n"
# ESG score
esg_score = stock_data.get("esg_score", "N/A")
if esg_score != "N/A":
analysis_text += f"- **ESG Score**: {esg_score}\n"
return analysis_text
except Exception as e:
return f"Error analyzing stock data: {str(e)}"
return None
def determine_intended_tool(message):
"""Determine which tool the AI intends to use based on the message"""
message_lower = message.lower()
tool_detection_map = {
"budget_planner": ["budget", "income", "expense", "spending", "allocat", "monthly", "plan", "financial plan", "money"],
"investment_analyzer": ["stock", "invest", "buy", "sell", "analyze", "AAPL", "GOOGL", "TSLA", "share", "equity"],
"portfolio_analyzer": ["portfolio", "holdings", "allocation", "diversif", "asset", "position"],
"market_trends": ["market", "trend", "news", "sector", "economic", "latest", "current"],
"expense_tracker": ["track", "expense", "spending", "categoriz", "cost"]
}
tool_names = {
"budget_planner": "Budget Planner",
"investment_analyzer": "Investment Analyzer",
"expense_tracker": "Expense Tracker",
"market_trends": "Market Trends Analyzer",
"portfolio_analyzer": "Portfolio Analyzer",
}
for tool_key, keywords in tool_detection_map.items():
if any(keyword in message_lower for keyword in keywords):
return tool_key, tool_names.get(tool_key, tool_key)
return None, None
def determine_response_type(message):
"""Determine if user wants detailed report or short response"""
message_lower = message.lower()
# Keywords indicating detailed response preference
detailed_keywords = [
"detailed", "detail", "comprehensive", "thorough", "in-depth", "full analysis",
"complete", "report", "breakdown", "explain", "elaborate", "deep dive",
"extensive", "detailed analysis", "full report", "comprehensive report"
]
# Keywords indicating short response preference
short_keywords = [
"quick", "brief", "short", "summary", "concise", "simple", "fast",
"just tell me", "quickly", "in short", "tldr", "bottom line"
]
# Check for detailed indicators first
if any(keyword in message_lower for keyword in detailed_keywords):
return "detailed"
# Check for short indicators
if any(keyword in message_lower for keyword in short_keywords):
return "short"
# Default to short response
return "short"
def process_financial_query(message, history):
"""Process user queries through the financial agent with streaming response"""
# Get the actual user message from the last entry in history
if not history or len(history) == 0:
return history
# Extract the last user message
last_user_message = None
for msg in reversed(history):
if msg["role"] == "user":
last_user_message = msg["content"]
break
if not last_user_message:
return history
# Convert Gradio history to agent format (excluding the last user message we just added)
agent_history = []
for i, msg in enumerate(history[:-1]): # Exclude the last message
agent_history.append(
{
"role": msg["role"],
"content": msg["content"]
if isinstance(msg["content"], str)
else str(msg["content"]),
}
)
# Start timer
start_time = time.time()
init_message_start_index = len(history)
try:
# Show what tool will be used and processing status
intended_tool_key, intended_tool_name = determine_intended_tool(last_user_message)
response_type = determine_response_type(last_user_message)
# Always show status for all tools with expected time estimates
if intended_tool_name:
if intended_tool_key == "market_trends":
status_msg = f"🔍 Fetching market news & analyzing trends (estimated 20-30 seconds)..."
elif intended_tool_key == "investment_analyzer":
status_msg = f"📈 Analyzing stock data & calculating metrics (estimated 10-15 seconds)..."
elif intended_tool_key == "budget_planner":
status_msg = f"💰 Processing budget analysis (estimated 5-10 seconds)..."
elif intended_tool_key == "portfolio_analyzer":
status_msg = f"📊 Analyzing portfolio data (estimated 8-12 seconds)..."
elif intended_tool_key == "expense_tracker":
status_msg = f"💳 Processing expense analysis (estimated 5-10 seconds)..."
else:
status_msg = f"🔄 Using {intended_tool_name} (estimated 5-15 seconds)..."
history.append(ChatMessage(role="assistant", content=status_msg))
yield history
else:
# If no tool detected, show generic processing message
history.append(ChatMessage(role="assistant", content=f"🧠 Processing your request (estimated 10-15 seconds)..."))
yield history
# Process message through agent
response, tool_used, tool_result, all_tools, all_results = agent.process_message_with_details(
last_user_message, agent_history
)
# Clear the processing message now that tool is complete
if len(history) > init_message_start_index:
history.pop() # Remove the processing message
# Step 5: Show tool execution results
if all_tools and all_results:
# Remove initialization messages but keep all previous conversation and tool info
history = history[:init_message_start_index]
tool_names = {
"budget_planner": "Budget Planner",
"investment_analyzer": "Investment Analyzer",
"expense_tracker": "Expense Tracker",
"market_trends": "Market Trends Analyzer",
"portfolio_analyzer": "Portfolio Analyzer",
}
tool_emojis = {
"Budget Planner": "💰",
"Investment Analyzer": "📈",
"Expense Tracker": "💳",
"Market Trends Analyzer": "📰",
"Portfolio Analyzer": "📊",
}
# Show results for all tools used
for i, (used_tool, result) in enumerate(zip(all_tools, all_results)):
tool_display_name = tool_names.get(used_tool, used_tool)
if result:
# Format tool result for display
try:
import json
if result.startswith("{") or result.startswith("["):
# Pretty format JSON output
parsed_result = json.loads(result)
# Truncate very long results for display
if len(str(parsed_result)) > 2000:
# Show summary for long results
if isinstance(parsed_result, dict):
summary = {
k: f"[{type(v).__name__}]"
if isinstance(v, (list, dict))
else v
for k, v in list(parsed_result.items())[:10]
}
display_result = f"```json\n{json.dumps(summary, indent=2)}\n... (truncated)\n```"
else:
display_result = f"```json\n{json.dumps(parsed_result, indent=2)[:1000]}...\n```"
else:
formatted_result = json.dumps(parsed_result, indent=2)
display_result = f"```json\n{formatted_result}\n```"
else:
# Truncate non-JSON results
display_result = (
result[:1000] + "..."
if len(result) > 1000
else result
)
except Exception as e:
display_result = (
str(result)[:1000] + "..."
if len(str(result)) > 1000
else str(result)
)
tool_emoji = tool_emojis.get(tool_display_name, "🔧")
collapsible_content = f"""
{tool_emoji} {tool_display_name} Results - Click to expand
{display_result}
🔍 {tool_display_name} Data Analysis - Click to expand
{analysis_data}