import json
import os
import time
from pathlib import Path
import gradio as gr
from gradio import ChatMessage
from agents.financial_agent import FinancialAdvisorAgent
from agents.tools import FinancialTools
# Avatar configuration
AVATAR_IMAGES = (
None,
"./public/images/fin_logo.png",
)
# Initialize tools and agent
financial_tools = FinancialTools(tavily_api_key=os.getenv("TAVILY_API_KEY"))
tools = financial_tools.get_all_tools()
agent = FinancialAdvisorAgent(tools=tools, openai_api_key=os.getenv("OPENAI_API_KEY"))
gr.set_static_paths(paths=[(Path.cwd() / "public" / "images").absolute()])
def analyze_data_with_repl(data_type, data):
"""Analyze financial data using Python REPL with comprehensive insights"""
if data_type == "budget":
try:
budget_data = json.loads(data)
categories = list(budget_data.get("current_expenses", {}).keys())
values = list(budget_data.get("current_expenses", {}).values())
income = budget_data.get("monthly_income", budget_data.get("income", 0))
if categories and values:
total_expenses = sum(values)
analysis_text = "💰 **Comprehensive Budget Analysis**\n\n"
# Income vs Expenses Overview
analysis_text += "## 📈 **Income vs Expenses Overview**\n"
analysis_text += f"- **Monthly Income**: ${income:,.0f}\n"
analysis_text += f"- **Total Expenses**: ${total_expenses:,.0f}\n"
if income > 0:
remaining = income - total_expenses
savings_rate = (remaining / income * 100) if income > 0 else 0
if remaining > 0:
analysis_text += f"- **💚 Surplus**: ${remaining:,.0f}\n"
analysis_text += f"- **💎 Savings Rate**: {savings_rate:.1f}%\n"
else:
analysis_text += f"- **🔴 Deficit**: ${abs(remaining):,.0f}\n"
analysis_text += (
f"- **⚠️ Overspending**: {abs(savings_rate):.1f}%\n"
)
# Expense Breakdown with Progress Bars
analysis_text += "\n## 💳 **Expense Breakdown**\n"
for i, (category, amount) in enumerate(zip(categories, values)):
percentage = (
(amount / total_expenses * 100) if total_expenses > 0 else 0
)
income_percentage = (amount / income * 100) if income > 0 else 0
bar = "█" * min(int(percentage / 3), 30) # Max 30 chars
analysis_text += f"**{category.title()}**: ${amount:,.0f}\n"
analysis_text += f" └─ {percentage:.1f}% of expenses | {income_percentage:.1f}% of income {bar}\n\n"
# Financial Health Metrics
analysis_text += "## 📊 **Financial Health Metrics**\n"
avg_expense = total_expenses / len(values)
largest_expense = max(values)
smallest_expense = min(values)
largest_category = categories[values.index(largest_expense)]
smallest_category = categories[values.index(smallest_expense)]
analysis_text += (
f"- **Average Category Expense**: ${avg_expense:,.0f}\n"
)
analysis_text += f"- **Highest Expense**: {largest_category} (${largest_expense:,.0f})\n"
analysis_text += f"- **Lowest Expense**: {smallest_category} (${smallest_expense:,.0f})\n"
analysis_text += (
f"- **Expense Range**: ${largest_expense - smallest_expense:,.0f}\n"
)
# Budget Recommendations
analysis_text += "\n## 💡 **Smart Budget Insights**\n"
# 50/30/20 Rule Analysis
if income > 0:
needs_target = income * 0.50
wants_target = income * 0.30
savings_target = income * 0.20
analysis_text += "**50/30/20 Rule Comparison:**\n"
analysis_text += f"- Needs Target (50%): ${needs_target:,.0f}\n"
analysis_text += f"- Wants Target (30%): ${wants_target:,.0f}\n"
analysis_text += f"- Savings Target (20%): ${savings_target:,.0f}\n"
if savings_rate >= 20:
analysis_text += "✅ **Excellent savings rate!**\n"
elif savings_rate >= 10:
analysis_text += "⚠️ **Good savings, aim for 20%**\n"
else:
analysis_text += (
"🔴 **Consider reducing expenses to save more**\n"
)
# Category Warnings
for category, amount in zip(categories, values):
if income > 0:
cat_percentage = amount / income * 100
if (
category.lower() in ["rent", "housing"]
and cat_percentage > 30
):
analysis_text += f"⚠️ **Housing costs high**: {cat_percentage:.1f}% (recommend <30%)\n"
elif (
category.lower() in ["food", "dining"]
and cat_percentage > 15
):
analysis_text += f"⚠️ **Food costs high**: {cat_percentage:.1f}% (recommend <15%)\n"
return analysis_text
except Exception as e:
return f"Error analyzing budget data: {str(e)}"
elif data_type == "portfolio":
try:
portfolio_data = json.loads(data)
holdings = portfolio_data.get("holdings", [])
total_value = sum(holding.get("value", 0) for holding in holdings)
analysis_text = "📊 **Advanced Portfolio Analysis**\n\n"
# Portfolio Overview
analysis_text += "## 💼 **Portfolio Overview**\n"
analysis_text += f"- **Total Portfolio Value**: ${total_value:,.2f}\n"
analysis_text += f"- **Number of Holdings**: {len(holdings)}\n"
if holdings:
values = [holding.get("value", 0) for holding in holdings]
avg_holding = sum(values) / len(values)
max_holding = max(values)
min_holding = min(values)
analysis_text += f"- **Average Holding Size**: ${avg_holding:,.2f}\n"
analysis_text += f"- **Largest Position**: ${max_holding:,.2f}\n"
analysis_text += f"- **Smallest Position**: ${min_holding:,.2f}\n"
# Detailed Holdings breakdown
analysis_text += "\n## 📈 **Holdings Breakdown**\n"
sorted_holdings = sorted(
holdings, key=lambda x: x.get("value", 0), reverse=True
)
for i, holding in enumerate(sorted_holdings, 1):
symbol = holding.get("symbol", "Unknown")
value = holding.get("value", 0)
shares = holding.get("shares", 0)
allocation = holding.get(
"allocation", (value / total_value * 100) if total_value > 0 else 0
)
sector = holding.get("sector", "Unknown")
# Calculate position concentration risk
risk_level = (
"🟢 Low"
if allocation < 10
else "🟡 Medium"
if allocation < 25
else "🔴 High"
)
analysis_text += f"**#{i} {symbol}** - {sector}\n"
analysis_text += f" └─ Value: ${value:,.2f} | Shares: {shares:,.0f} | Weight: {allocation:.1f}%\n"
analysis_text += f" └─ Concentration Risk: {risk_level}\n\n"
# Sector analysis with advanced metrics
sectors = {}
sector_values = {}
for holding in holdings:
sector = holding.get("sector", "Unknown")
allocation = holding.get("allocation", 0)
value = holding.get("value", 0)
sectors[sector] = sectors.get(sector, 0) + allocation
sector_values[sector] = sector_values.get(sector, 0) + value
if sectors:
analysis_text += "## 🏭 **Sector Diversification Analysis**\n"
sorted_sectors = sorted(
sectors.items(), key=lambda x: x[1], reverse=True
)
for sector, allocation in sorted_sectors:
bar = "█" * min(int(allocation / 2), 30)
value = sector_values.get(sector, 0)
# Sector concentration assessment
if allocation > 40:
risk_emoji = "🔴"
risk_text = "Over-concentrated"
elif allocation > 25:
risk_emoji = "🟡"
risk_text = "Moderate concentration"
else:
risk_emoji = "🟢"
risk_text = "Well diversified"
analysis_text += f"**{sector}**: {allocation:.1f}% (${value:,.2f}) {risk_emoji}\n"
analysis_text += f" └─ {bar} {risk_text}\n\n"
# Portfolio Health Metrics
analysis_text += "## 🎯 **Portfolio Health Assessment**\n"
# Diversification Score
num_sectors = len(sectors)
if num_sectors >= 8:
diversification = "🟢 Excellent"
elif num_sectors >= 5:
diversification = "🟡 Good"
else:
diversification = "🔴 Poor"
analysis_text += f"- **Sector Diversification**: {diversification} ({num_sectors} sectors)\n"
# Concentration Risk
if holdings:
top_3_allocation = sum(
sorted([h.get("allocation", 0) for h in holdings], reverse=True)[:3]
)
if top_3_allocation > 60:
concentration_risk = "🔴 High"
elif top_3_allocation > 40:
concentration_risk = "🟡 Medium"
else:
concentration_risk = "🟢 Low"
analysis_text += f"- **Concentration Risk**: {concentration_risk} (Top 3: {top_3_allocation:.1f}%)\n"
# Portfolio Recommendations
analysis_text += "\n## 💡 **Portfolio Optimization Recommendations**\n"
# Check for over-concentration
for holding in holdings:
allocation = holding.get("allocation", 0)
if allocation > 25:
analysis_text += f"⚠️ **{holding.get('symbol', 'Unknown')}** is over-weighted at {allocation:.1f}% (consider rebalancing)\n"
# Sector recommendations
for sector, allocation in sectors.items():
if allocation > 40:
analysis_text += f"⚠️ **{sector}** sector over-weighted at {allocation:.1f}% (consider diversification)\n"
# Diversification suggestions
if num_sectors < 5:
analysis_text += "💡 **Consider adding exposure to more sectors for better diversification**\n"
if len(holdings) < 10:
analysis_text += (
"💡 **Consider adding more holdings to reduce single-stock risk**\n"
)
return analysis_text
except Exception as e:
return f"Error analyzing portfolio data: {str(e)}"
elif data_type == "stock":
try:
stock_data = json.loads(data)
symbol = stock_data.get("symbol", "Unknown")
price_str = stock_data.get("current_price", "0")
analysis_text = f"📈 **Comprehensive Stock Analysis: {symbol}**\n\n"
# Company Overview
analysis_text += "## 🏢 **Company Overview**\n"
analysis_text += f"- **Symbol**: {symbol}\n"
analysis_text += f"- **Current Price**: {price_str}\n"
analysis_text += f"- **Company**: {stock_data.get('company_name', 'N/A')}\n"
analysis_text += f"- **Sector**: {stock_data.get('sector', 'N/A')}\n"
analysis_text += f"- **Industry**: {stock_data.get('industry', 'N/A')}\n"
analysis_text += (
f"- **Market Cap**: {stock_data.get('market_cap', 'N/A')}\n\n"
)
# Financial Metrics
financials = stock_data.get("financials", {})
if financials:
analysis_text += "## 💹 **Key Financial Metrics**\n"
# Valuation metrics
pe_ratio = financials.get("pe_ratio", "N/A")
pb_ratio = financials.get("pb_ratio", "N/A")
ps_ratio = financials.get("ps_ratio", "N/A")
analysis_text += f"- **P/E Ratio**: {pe_ratio}"
if pe_ratio != "N/A" and isinstance(pe_ratio, (int, float)):
if pe_ratio < 15:
analysis_text += " 🟢 (Undervalued)"
elif pe_ratio > 25:
analysis_text += " 🔴 (Potentially Overvalued)"
else:
analysis_text += " 🟡 (Fairly Valued)"
analysis_text += "\n"
analysis_text += f"- **P/B Ratio**: {pb_ratio}\n"
analysis_text += f"- **P/S Ratio**: {ps_ratio}\n"
# Profitability metrics
analysis_text += f"- **ROE**: {financials.get('roe', 'N/A')}\n"
analysis_text += f"- **ROA**: {financials.get('roa', 'N/A')}\n"
analysis_text += (
f"- **Profit Margin**: {financials.get('profit_margin', 'N/A')}\n"
)
analysis_text += f"- **Revenue Growth**: {financials.get('revenue_growth', 'N/A')}\n\n"
# Performance analysis with trend indicators
performance = stock_data.get("performance", {})
if performance:
analysis_text += "## 📊 **Performance Analysis**\n"
periods = ["1d", "1w", "1m", "3m", "6m", "1y", "ytd"]
for period in periods:
if period in performance:
return_pct = performance[period]
# Add trend indicators
if isinstance(return_pct, str) and "%" in return_pct:
try:
pct_value = float(return_pct.replace("%", ""))
if pct_value > 0:
trend = "📈"
elif pct_value < 0:
trend = "📉"
else:
trend = "➡️"
except:
trend = ""
else:
trend = ""
analysis_text += (
f"- **{period.upper()}**: {return_pct} {trend}\n"
)
analysis_text += "\n"
# Advanced Risk Assessment
risk_data = stock_data.get("risk_assessment", {})
if risk_data:
analysis_text += "## ⚠️ **Risk Assessment**\n"
risk_level = risk_data.get("risk_level", "N/A")
volatility = risk_data.get("volatility_30d", "N/A")
beta = risk_data.get("beta", "N/A")
# Risk level with emoji indicators
if risk_level.lower() == "low":
risk_emoji = "🟢"
elif risk_level.lower() == "medium":
risk_emoji = "🟡"
elif risk_level.lower() == "high":
risk_emoji = "🔴"
else:
risk_emoji = ""
analysis_text += f"- **Risk Level**: {risk_level} {risk_emoji}\n"
analysis_text += f"- **30-Day Volatility**: {volatility}\n"
analysis_text += f"- **Beta**: {beta}"
if beta != "N/A" and isinstance(beta, (int, float)):
if beta > 1.2:
analysis_text += " (High volatility vs market)"
elif beta < 0.8:
analysis_text += " (Low volatility vs market)"
else:
analysis_text += " (Similar to market)"
analysis_text += "\n\n"
# Technical Analysis
technical = stock_data.get("technical_analysis", {})
if technical:
analysis_text += "## 📈 **Technical Analysis**\n"
analysis_text += f"- **50-Day MA**: {technical.get('ma_50', 'N/A')}\n"
analysis_text += f"- **200-Day MA**: {technical.get('ma_200', 'N/A')}\n"
analysis_text += f"- **RSI**: {technical.get('rsi', 'N/A')}\n"
analysis_text += (
f"- **Support Level**: {technical.get('support', 'N/A')}\n"
)
analysis_text += (
f"- **Resistance Level**: {technical.get('resistance', 'N/A')}\n\n"
)
# Investment Recommendation with detailed reasoning
recommendation = stock_data.get("recommendation", {})
if recommendation:
action = recommendation.get("action", "N/A")
confidence = recommendation.get("confidence", "N/A")
reasoning = recommendation.get("reasoning", "")
analysis_text += "## 💡 **Investment Recommendation**\n"
# Action with emoji
if action.lower() == "buy":
action_emoji = "🟢"
elif action.lower() == "sell":
action_emoji = "🔴"
elif action.lower() == "hold":
action_emoji = "🟡"
else:
action_emoji = ""
analysis_text += f"- **Action**: {action} {action_emoji}\n"
analysis_text += f"- **Confidence**: {confidence}\n"
if reasoning:
analysis_text += f"- **Reasoning**: {reasoning}\n"
analysis_text += "\n"
# Additional Investment Considerations
analysis_text += "## 🎯 **Investment Considerations**\n"
# Dividend info
dividend_yield = stock_data.get("dividend_yield", "N/A")
if dividend_yield != "N/A":
analysis_text += f"- **Dividend Yield**: {dividend_yield}\n"
# Analyst ratings
analyst_rating = stock_data.get("analyst_rating", "N/A")
if analyst_rating != "N/A":
analysis_text += f"- **Analyst Rating**: {analyst_rating}\n"
# Price targets
price_target = stock_data.get("price_target", "N/A")
if price_target != "N/A":
analysis_text += f"- **Price Target**: {price_target}\n"
# ESG score
esg_score = stock_data.get("esg_score", "N/A")
if esg_score != "N/A":
analysis_text += f"- **ESG Score**: {esg_score}\n"
return analysis_text
except Exception as e:
return f"Error analyzing stock data: {str(e)}"
return None
def determine_intended_tool(message):
"""Determine which tool the AI intends to use based on the message"""
message_lower = message.lower()
tool_detection_map = {
"budget_planner": [
"budget",
"income",
"expense",
"spending",
"allocat",
"monthly",
"plan",
"financial plan",
"money",
"track",
"categoriz",
"cost",
],
"investment_analyzer": [
"stock",
"invest",
"buy",
"sell",
"analyze",
"AAPL",
"GOOGL",
"TSLA",
"share",
"equity",
],
"portfolio_analyzer": [
"portfolio",
"holdings",
"allocation",
"diversif",
"asset",
"position",
],
"market_trends": [
"market",
"trend",
"news",
"sector",
"economic",
"latest",
"current",
],
}
tool_names = {
"budget_planner": "Budget Planner",
"investment_analyzer": "Investment Analyzer",
"market_trends": "Market Trends Analyzer",
"portfolio_analyzer": "Portfolio Analyzer",
}
for tool_key, keywords in tool_detection_map.items():
if any(keyword in message_lower for keyword in keywords):
return tool_key, tool_names.get(tool_key, tool_key)
return None, None
def determine_response_type(message):
"""Determine if user wants detailed report or short response"""
message_lower = message.lower()
# Keywords indicating detailed response preference
detailed_keywords = [
"detailed",
"detail",
"comprehensive",
"thorough",
"in-depth",
"full analysis",
"complete",
"report",
"breakdown",
"explain",
"elaborate",
"deep dive",
"extensive",
"detailed analysis",
"full report",
"comprehensive report",
]
# Keywords indicating short response preference
short_keywords = [
"quick",
"brief",
"short",
"summary",
"concise",
"simple",
"fast",
"just tell me",
"quickly",
"in short",
"tldr",
"bottom line",
]
# Check for detailed indicators first
if any(keyword in message_lower for keyword in detailed_keywords):
return "detailed"
# Check for short indicators
if any(keyword in message_lower for keyword in short_keywords):
return "short"
# Default to short response
return "short"
def process_financial_query(message, history):
"""Process user queries through the financial agent with streaming response"""
# Get the actual user message from the last entry in history
if not history or len(history) == 0:
return history
# Extract the last user message
last_user_message = None
for msg in reversed(history):
if msg["role"] == "user":
last_user_message = msg["content"]
break
if not last_user_message:
return history
# Convert Gradio history to agent format (excluding the last user message we just added)
agent_history = []
for i, msg in enumerate(history[:-1]): # Exclude the last message
agent_history.append(
{
"role": msg["role"],
"content": msg["content"]
if isinstance(msg["content"], str)
else str(msg["content"]),
}
)
# Start timer
start_time = time.time()
init_message_start_index = len(history)
try:
# Show what tool will be used and processing status
intended_tool_key, intended_tool_name = determine_intended_tool(
last_user_message
)
response_type = determine_response_type(last_user_message)
# Always show status for all tools with expected time estimates
if intended_tool_name:
if intended_tool_key == "market_trends":
status_msg = "🔍 Fetching market news & analyzing trends (estimated 20-30 seconds)..."
elif intended_tool_key == "investment_analyzer":
status_msg = "📈 Analyzing stock data & calculating metrics (estimated 10-15 seconds)..."
elif intended_tool_key == "budget_planner":
status_msg = "💰 Processing budget analysis (estimated 5-10 seconds)..."
elif intended_tool_key == "portfolio_analyzer":
status_msg = "📊 Analyzing portfolio data (estimated 8-12 seconds)..."
else:
status_msg = (
f"🔄 Using {intended_tool_name} (estimated 5-15 seconds)..."
)
history.append(ChatMessage(role="assistant", content=status_msg))
yield history
else:
# If no tool detected, show generic processing message
history.append(
ChatMessage(
role="assistant",
content="🧠 Processing your request (estimated 10-15 seconds)...",
)
)
yield history
# Process message through agent
response, tool_used, tool_result, all_tools, all_results = (
agent.process_message_with_details(last_user_message, agent_history)
)
# Clear the processing message now that tool is complete
if len(history) > init_message_start_index:
history.pop() # Remove the processing message
# Step 5: Show tool execution results
if all_tools and all_results:
# Remove initialization messages but keep all previous conversation and tool info
history = history[:init_message_start_index]
tool_names = {
"budget_planner": "Budget Planner",
"investment_analyzer": "Investment Analyzer",
"market_trends": "Market Trends Analyzer",
"portfolio_analyzer": "Portfolio Analyzer",
}
tool_emojis = {
"Budget Planner": "💰",
"Investment Analyzer": "📈",
"Market Trends Analyzer": "📰",
"Portfolio Analyzer": "📊",
}
# Show results for all tools used
for i, (used_tool, result) in enumerate(zip(all_tools, all_results)):
tool_display_name = tool_names.get(used_tool, used_tool)
if result:
# Format tool result for display
try:
import json
if result.startswith("{") or result.startswith("["):
# Pretty format JSON output
parsed_result = json.loads(result)
# Truncate very long results for display
if len(str(parsed_result)) > 2000:
# Show summary for long results
if isinstance(parsed_result, dict):
summary = {
k: f"[{type(v).__name__}]"
if isinstance(v, (list, dict))
else v
for k, v in list(parsed_result.items())[:10]
}
display_result = f"```json\n{json.dumps(summary, indent=2)}\n... (truncated)\n```"
else:
display_result = f"```json\n{json.dumps(parsed_result, indent=2)[:1000]}...\n```"
else:
formatted_result = json.dumps(parsed_result, indent=2)
display_result = f"```json\n{formatted_result}\n```"
else:
# Truncate non-JSON results
display_result = (
result[:1000] + "..." if len(result) > 1000 else result
)
except Exception:
display_result = (
str(result)[:1000] + "..."
if len(str(result)) > 1000
else str(result)
)
tool_emoji = tool_emojis.get(tool_display_name, "🔧")
collapsible_content = f"""
{tool_emoji} {tool_display_name} Results - Click to expand
{display_result}
🔍 {tool_display_name} Data Analysis - Click to expand
{analysis_data}