Abid Ali Awan
Refactor portfolio analyzer in FinancialTools: simplified input extraction and handling, improved default portfolio logic, and enhanced analysis output with basic recommendations for diversification.
27965d3
import json | |
import operator | |
import re | |
from typing import Annotated, List, Tuple, TypedDict, Union | |
from langchain.agents import AgentExecutor, create_openai_tools_agent | |
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder | |
from langchain.schema import AIMessage, HumanMessage, SystemMessage | |
from langchain.tools import Tool | |
from langchain_openai import ChatOpenAI | |
class AgentState(TypedDict): | |
messages: Annotated[List[Union[HumanMessage, AIMessage]], operator.add] | |
context: dict | |
class FinancialAdvisorAgent: | |
def __init__(self, tools: List[Tool], openai_api_key: str): | |
self.tools = tools | |
self.llm = ChatOpenAI( | |
api_key=openai_api_key, model="gpt-4.1-mini-2025-04-14", temperature=0.7 | |
) | |
self.tools_by_name = {tool.name: tool for tool in tools} | |
# Create agent with tools | |
self.system_prompt = """You are a professional financial advisor AI assistant with access to specialized tools. | |
Available tools: | |
- budget_planner: Use when users ask about budgeting, income allocation, or expense planning. Input should be JSON with 'income' and 'expenses' keys. | |
- investment_analyzer: Use when users ask about specific stocks or investments. Input should be a stock symbol (e.g., AAPL). | |
- market_trends: Use when users ask about market trends or financial news. Input should be a search query. | |
- portfolio_analyzer: Use when users want to analyze their portfolio. Input should be JSON with 'holdings' array. | |
IMPORTANT: You MUST use these tools when answering financial questions. Do not provide generic advice without using the appropriate tool first. | |
When a user asks a question: | |
1. Identify which tool is most appropriate | |
2. Extract or request the necessary information | |
3. Use the tool to get specific data | |
4. Provide advice based on the tool's output""" | |
self.prompt = ChatPromptTemplate.from_messages( | |
[ | |
("system", self.system_prompt), | |
MessagesPlaceholder(variable_name="messages"), | |
("human", "{input}"), | |
MessagesPlaceholder(variable_name="agent_scratchpad"), | |
] | |
) | |
self.agent = create_openai_tools_agent(self.llm, self.tools, self.prompt) | |
self.agent_executor = AgentExecutor( | |
agent=self.agent, | |
tools=self.tools, | |
verbose=True, | |
return_intermediate_steps=True, | |
) | |
def _extract_tool_usage(self, intermediate_steps): | |
"""Extract tool usage from intermediate steps""" | |
tools_used = [] | |
tool_results = [] | |
for action, result in intermediate_steps: | |
if hasattr(action, "tool"): | |
tools_used.append(action.tool) | |
tool_results.append(result) | |
# Return the last tool used and its result for backward compatibility | |
# But also return all tools and results for multi-tool scenarios | |
if tools_used: | |
return tools_used[-1], tool_results[-1], tools_used, tool_results | |
return None, None, [], [] | |
def _prepare_tool_input(self, message: str, tool_name: str) -> str: | |
"""Prepare input for specific tools based on the message""" | |
if tool_name == "investment_analyzer": | |
# Extract stock symbols | |
symbols = re.findall(r"\b[A-Z]{2,5}\b", message) | |
if symbols: | |
return symbols[0] | |
return "AAPL" # Default | |
elif tool_name == "budget_planner": | |
# Try to extract income and expenses from message | |
income_match = re.search( | |
r"\$?(\d+(?:,\d{3})*(?:\.\d{2})?)\s*(?:monthly\s*)?income", | |
message, | |
re.I, | |
) | |
income = ( | |
float(income_match.group(1).replace(",", "")) if income_match else 5000 | |
) | |
# Extract expenses | |
expenses = {} | |
expense_patterns = [ | |
(r"rent:?\s*\$?(\d+(?:,\d{3})*(?:\.\d{2})?)", "rent"), | |
(r"food:?\s*\$?(\d+(?:,\d{3})*(?:\.\d{2})?)", "food"), | |
(r"utilities:?\s*\$?(\d+(?:,\d{3})*(?:\.\d{2})?)", "utilities"), | |
( | |
r"transportation:?\s*\$?(\d+(?:,\d{3})*(?:\.\d{2})?)", | |
"transportation", | |
), | |
] | |
for pattern, category in expense_patterns: | |
match = re.search(pattern, message, re.I) | |
if match: | |
expenses[category] = float(match.group(1).replace(",", "")) | |
return json.dumps({"income": income, "expenses": expenses}) | |
elif tool_name == "portfolio_analyzer": | |
return message | |
elif tool_name == "market_trends": | |
return message | |
return message | |
def process_message_with_details( | |
self, message: str, history: List[dict] = None | |
) -> Tuple[str, str, str, List[str], List[str]]: | |
"""Process a message and return response, tool used, tool result, and all tools/results""" | |
if history is None: | |
history = [] | |
# Check if this is a multi-tool query (contains keywords for multiple tools) | |
message_lower = message.lower() | |
tool_keywords = { | |
"budget_planner": ["budget", "income", "expense", "spending", "allocat", "track", "categoriz"], | |
"investment_analyzer": ["stock", "invest", "buy", "sell", "analyze"], | |
"portfolio_analyzer": ["portfolio", "holdings", "allocation", "diversif"], | |
"market_trends": ["market", "trend", "news", "sector", "economic"] | |
} | |
detected_tools = [] | |
for tool_name, keywords in tool_keywords.items(): | |
if any(word in message_lower for word in keywords): | |
# Special check for investment analyzer - needs stock symbols | |
if tool_name == "investment_analyzer": | |
if re.search(r"\b[A-Z]{2,5}\b", message) or any(word in message_lower for word in ["stock", "invest", "recommend"]): | |
detected_tools.append(tool_name) | |
else: | |
detected_tools.append(tool_name) | |
# If multiple tools detected or complex query, use agent executor | |
if len(detected_tools) > 1 or len(message.split()) > 15: | |
try: | |
result = self.agent_executor.invoke({"input": message, "messages": []}) | |
tool_used, tool_result, all_tools, all_results = self._extract_tool_usage( | |
result.get("intermediate_steps", []) | |
) | |
return result["output"], tool_used, tool_result, all_tools, all_results | |
except Exception as e: | |
return ( | |
f"I encountered an error processing your request: {str(e)}", | |
None, | |
None, | |
[], | |
[] | |
) | |
# Single tool execution for simple queries | |
elif len(detected_tools) == 1: | |
selected_tool = detected_tools[0] | |
try: | |
tool = self.tools_by_name[selected_tool] | |
tool_input = self._prepare_tool_input(message, selected_tool) | |
# Execute the tool | |
tool_result = tool.func(tool_input) | |
# Generate response based on tool result - optimized for speed | |
response_prompt = f"""Based on this {selected_tool.replace('_', ' ')} analysis, provide a concise financial summary for: {message} | |
Data: {tool_result} | |
Keep response under 200 words with key insights and 2-3 actionable recommendations.""" | |
response = self.llm.invoke( | |
[ | |
SystemMessage(content="Financial advisor. Be concise and actionable."), | |
HumanMessage(content=response_prompt), | |
] | |
) | |
return response.content, selected_tool, tool_result, [selected_tool], [tool_result] | |
except Exception as e: | |
return f"Error using {selected_tool}: {str(e)}", selected_tool, None, [], [] | |
# Fallback to agent executor for unclear queries | |
else: | |
try: | |
result = self.agent_executor.invoke({"input": message, "messages": []}) | |
tool_used, tool_result, all_tools, all_results = self._extract_tool_usage( | |
result.get("intermediate_steps", []) | |
) | |
return result["output"], tool_used, tool_result, all_tools, all_results | |
except Exception as e: | |
return ( | |
f"I encountered an error processing your request: {str(e)}", | |
None, | |
None, | |
[], | |
[] | |
) | |
def process_message(self, message: str, history: List[dict] = None): | |
"""Process a user message and return response""" | |
response, _, _, _, _ = self.process_message_with_details(message, history) | |
return response | |
def stream_response(self, message: str, tool_result: str, selected_tool: str, response_type: str = "short"): | |
"""Stream the LLM response in real-time""" | |
if response_type == "detailed": | |
response_prompt = f"""Based on the following comprehensive analysis from the {selected_tool.replace('_', ' ').title()}: | |
{tool_result} | |
Provide detailed financial advice to the user addressing their question: {message} | |
Guidelines: | |
- Be thorough and comprehensive | |
- Reference specific data points from the analysis | |
- Provide clear, actionable recommendations with explanations | |
- Include multiple scenarios or considerations where relevant | |
- Use a professional but friendly tone | |
- Structure your response with clear sections | |
- Provide context for your recommendations""" | |
system_message = "You are a professional financial advisor. Provide comprehensive, detailed advice based on the analysis results. Be thorough and educational." | |
else: | |
response_prompt = f"""Based on this {selected_tool.replace('_', ' ')} analysis, provide a concise financial summary for: {message} | |
Data: {tool_result} | |
Keep response under 200 words with key insights and 2-3 actionable recommendations.""" | |
system_message = "Financial advisor. Be concise and actionable." | |
messages = [ | |
SystemMessage(content=system_message), | |
HumanMessage(content=response_prompt), | |
] | |
# Stream the response token by token | |
for chunk in self.llm.stream(messages): | |
if chunk.content: | |
yield chunk.content | |