agentic_ai / app.py
uditk99's picture
Upload 2 files
2d4ed0d verified
# -*- coding: utf-8 -*-
"""v1_Multi_Agent.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1Whj6LVa2_xvNcS8JyXToLCNOBwx6wn7K
"""
# # === Cell 1: Setup & Installs ===
# import gc, os, sys
# _ = [gc.collect() for _ in range(3)]
# # Core libs (quiet)
# !pip -q install yfinance requests pandas numpy matplotlib tqdm \
# langchain langchain-community langgraph transformers accelerate \
# faiss-cpu sentence-transformers gnews neo4j scipy tabulate gradio
# # Quiet Transformers logs
# os.environ["TRANSFORMERS_VERBOSITY"] = "error"
# try:
# from transformers.utils import logging as hf_logging
# hf_logging.set_verbosity_error()
# except Exception:
# pass
# print("βœ… Environment ready.")
# Cell 2 β€” Config & Globals
import os, gc, math, json, re, time, requests, numpy as np, pandas as pd
from datetime import datetime, timedelta, timezone
# Clean memory a bit when re-running
_ = [gc.collect() for _ in range(3)]
pd.set_option("display.max_columns", None)
# ---- Keys / URIs ----
NEWSAPI_KEY = "866bf47e4ad34118af6634a1020bce96" # your key (NewsAPI.org)
NEO4J_URI = "neo4j+s://82fe4549.databases.neo4j.io"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "CZMkO1HLvPhDf3mjzw71szMeGAfRSAw9BaTcZpHpaGs"
ENABLE_NEO4J = True # << TURNED ON
# ---- Constants ----
RISK_FREE_RATE = 0.03
NEWS_LOOKBACK_DAYS = 14
PRICE_LOOKBACK_DAYS = 365 * 2
MAX_ARTICLES = 60
MAX_NEWS_PER_TICKER = 30
EMBED_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
# ---- Device/logging tweaks ----
os.environ["TRANSFORMERS_VERBOSITY"] = "error"
try:
from transformers.utils import logging as hf_logging
hf_logging.set_verbosity_error()
except Exception:
pass
def today_utc_date():
return datetime.now(timezone.utc).date()
def days_ago(n):
return today_utc_date() - timedelta(days=n)
# Cell 3 β€” Common helpers: symbol resolve, embedder, Neo4j driver (UPDATED)
import yfinance as yf
import unicodedata, string, re, requests
def _clean_text(q: str) -> str:
"""Normalize and strip invisible Unicode so 'AAPL' always parses."""
if q is None:
return ""
q = unicodedata.normalize("NFKC", str(q))
# Remove common zero-width / directional marks
for ch in ("\u200b", "\u200c", "\u200d", "\u200e", "\u200f", "\u202a", "\u202b", "\u202c", "\u202d", "\u202e"):
q = q.replace(ch, "")
# Keep only printable characters
q = "".join(ch for ch in q if ch.isprintable())
return q.strip()
def parse_user_query(q: str):
q = _clean_text(q)
q_up = q.upper()
# Exact ticker like AAPL, TSLA, SPY (letters only, up to 6)
if re.fullmatch(r"[A-Z]{1,6}", q_up):
return q_up, "maybe_ticker"
# Grab the first contiguous A–Z token up to 6 chars (ignore word boundaries)
hits = re.findall(r"[A-Z]{1,6}", q_up)
if hits:
return hits[0], "maybe_ticker"
# Otherwise treat as a name to search
name = re.sub(r"(what.*price of|can i invest in|stock price of|suggest|recommend|optimi[sz]e)",
"", q, flags=re.I).strip(" ?")
return (name if name else q), "maybe_name"
def yahoo_symbol_search(query: str, exchanges=None):
"""Yahoo search without over-filtering exchange names (Yahoo returns 'NasdaqGS', 'NMS', etc.)."""
url = "https://query2.finance.yahoo.com/v1/finance/search"
params = {"q": query, "quotesCount": 10, "newsCount": 0}
try:
r = requests.get(url, params=params, timeout=10)
r.raise_for_status()
data = r.json()
except Exception:
return []
out = []
for q in data.get("quotes", []):
qtype = q.get("quoteType")
if qtype in ("EQUITY", "ETF", "MUTUALFUND", "INDEX", "CRYPTOCURRENCY"):
out.append({
"symbol": q.get("symbol"),
"shortname": q.get("shortname"),
"longname": q.get("longname"),
"exchange": q.get("exchange"),
})
return out
def resolve_to_ticker(user_text: str):
token, kind = parse_user_query(user_text)
if kind == "maybe_ticker" and token:
return token
matches = yahoo_symbol_search(token)
if matches:
return matches[0]["symbol"]
# Last resort: pick first A–Z run
hits = re.findall(r"[A-Z]{1,6}", (token or "").upper())
if hits:
return hits[0]
raise ValueError(f"Could not resolve '{user_text}' to a ticker.")
# ---- Shared embedder (one per runtime)
from sentence_transformers import SentenceTransformer
_embedder = SentenceTransformer(EMBED_MODEL_NAME, device="cpu") # CPU is fine for MiniLM
# ---- Neo4j driver (one per runtime)
from neo4j import GraphDatabase
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) if ENABLE_NEO4J else None
def _have_neo4j_driver():
return ENABLE_NEO4J and (driver is not None)
# Cell 4 β€” Shared TinyLlama chat writer for short summaries
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
LLM_ID = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
tok = AutoTokenizer.from_pretrained(LLM_ID, use_fast=True)
llm_model = AutoModelForCausalLM.from_pretrained(LLM_ID, device_map="auto", torch_dtype="auto")
gen_pipe = pipeline("text-generation", model=llm_model, tokenizer=tok, max_new_tokens=600, do_sample=False)
def chat_summarize(system_msg: str, user_msg: str) -> str:
chat = [{"role":"system","content":system_msg},{"role":"user","content":user_msg}]
try:
prompt = tok.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)
out = gen_pipe(prompt, return_full_text=False)[0]["generated_text"].strip()
return out
except Exception:
out = gen_pipe(user_msg, return_full_text=False)[0]["generated_text"].strip()
return out
# Cell 5 β€” Historical Trends Agent (UPDATED)
import matplotlib.pyplot as plt
import faiss
from langchain.tools import tool
import numpy as np, pandas as pd, math, json, re
from datetime import timedelta
# ---------- Data & KPIs ----------
def get_last_year_data(ticker: str):
end = today_utc_date()
start = end - timedelta(days=365)
df = yf.download(ticker, start=str(start), end=str(end + timedelta(days=1)),
auto_adjust=False, progress=False)
if df.empty:
raise ValueError(f"No data for {ticker}.")
df = df[["Open","High","Low","Close","Adj Close","Volume"]].copy()
df.index = pd.to_datetime(df.index)
return df
def _to_scalar(x):
try:
return float(getattr(x, "item", lambda: x)())
except Exception:
try: return float(x)
except Exception:
return float(np.asarray(x).reshape(-1)[0])
def compute_kpis(df: pd.DataFrame, risk_free_rate=RISK_FREE_RATE):
adj = df["Adj Close"]
if isinstance(adj, pd.DataFrame) and adj.shape[1]==1:
adj = adj.squeeze("columns")
adj = pd.to_numeric(adj, errors="coerce").dropna()
n = int(adj.shape[0])
if n < 2:
raise ValueError("Not enough data points to compute KPIs.")
start_price = _to_scalar(adj.iloc[0])
end_price = _to_scalar(adj.iloc[-1])
total_return = (end_price / start_price) - 1.0
rets = adj.pct_change().dropna()
cagr = (1.0 + total_return) ** (252.0 / n) - 1.0
ann_vol = _to_scalar(rets.std()) * np.sqrt(252.0)
sharpe = (cagr - risk_free_rate) / (ann_vol + 1e-12)
cum_max = adj.cummax()
drawdowns = adj / cum_max - 1.0
max_dd = _to_scalar(drawdowns.min())
bd_dt = rets.idxmax(); wd_dt = rets.idxmin()
def _fmt_date(d):
try: return pd.to_datetime(d).strftime("%Y-%m-%d")
except Exception: return str(d)
best_day = (_fmt_date(bd_dt), _to_scalar(rets.max()))
worst_day = (_fmt_date(wd_dt), _to_scalar(rets.min()))
monthly = adj.resample("ME").last().pct_change().dropna()
return {
"start_price": float(start_price), "end_price": float(end_price),
"total_return": float(total_return), "cagr": float(cagr),
"ann_vol": float(ann_vol), "sharpe": float(sharpe),
"max_drawdown": float(max_dd), "best_day": (best_day[0], float(best_day[1])),
"worst_day": (worst_day[0], float(worst_day[1])),
"monthly_table": monthly.to_frame("Monthly Return"), "n_days": int(n),
}
# ---------- Neo4j write ----------
def store_to_neo4j(ticker: str, df: pd.DataFrame):
if not _have_neo4j_driver(): return
info = yf.Ticker(ticker).info or {}
name = info.get("shortName") or info.get("longName") or ticker
exchange = info.get("exchange") or info.get("fullExchangeName") or "UNKNOWN"
rows = []
for d, r in df.iterrows():
rows.append({
"id": f"{ticker}_{d.date().isoformat()}",
"date": d.date().isoformat(),
"open": float(r["Open"]), "high": float(r["High"]), "low": float(r["Low"]),
"close": float(r["Close"]), "adj_close": float(r["Adj Close"]),
"volume": int(r["Volume"]) if not np.isnan(r["Volume"]) else 0,
})
with driver.session() as session:
session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (s:Stock) REQUIRE s.symbol IS UNIQUE;")
session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (p:PriceBar) REQUIRE p.id IS UNIQUE;")
session.run("""MERGE (s:Stock {symbol:$symbol}) SET s.name=$name, s.exchange=$ex""",
symbol=ticker, name=name, ex=exchange)
chunk = 250
for i in range(0, len(rows), chunk):
session.run("""
UNWIND $rows AS r
MERGE (p:PriceBar {id:r.id})
SET p.date=r.date, p.open=r.open, p.high=r.high, p.low=r.low,
p.close=r.close, p.adj_close=r.adj_close, p.volume=r.volume
WITH p
MATCH (s:Stock {symbol:$symbol})
MERGE (s)-[:HAS_PRICE]->(p)
""", rows=rows[i:i+chunk], symbol=ticker)
# ---------- Facts / Retriever ----------
def build_fact_corpus(ticker: str, kpis: dict):
f = []
f.append(f"{ticker} total return over last 1Y: {kpis['total_return']:.6f}")
f.append(f"{ticker} CAGR (last 1Y approximated): {kpis['cagr']:.6f}")
f.append(f"{ticker} annualized volatility (1Y): {kpis['ann_vol']:.6f}")
f.append(f"{ticker} Sharpe ratio (rf={RISK_FREE_RATE:.2%}): {kpis['sharpe']:.4f}")
f.append(f"{ticker} max drawdown (1Y): {kpis['max_drawdown']:.6f}")
f.append(f"{ticker} best day (1Y): {kpis['best_day'][0]} return {kpis['best_day'][1]:.6f}")
f.append(f"{ticker} worst day (1Y): {kpis['worst_day'][0]} return {kpis['worst_day'][1]:.6f}")
f.append(f"{ticker} start price (Adj Close): {kpis['start_price']:.6f}")
f.append(f"{ticker} end price (Adj Close): {kpis['end_price']:.6f}")
f.append(f"{ticker} period days counted: {kpis['n_days']}")
return f
class FactRetriever:
def __init__(self, sentences):
self.sentences = sentences
X = _embedder.encode(sentences, convert_to_numpy=True, normalize_embeddings=True)
self.index = faiss.IndexFlatIP(X.shape[1])
self.index.add(X)
def query(self, q, top_k=5):
qv = _embedder.encode([q], convert_to_numpy=True, normalize_embeddings=True)
D, I = self.index.search(qv, top_k)
return [(self.sentences[i], float(D[0][j])) for j, i in enumerate(I[0])]
# ---------- Tools (LangChain) ----------
_GLOBAL_HIST = {"latest": {}}
@tool
def analyze_last_year(ticker: str) -> str:
"""Fetch last 1Y OHLCV, compute KPIs, build retriever, write Neo4j, return compact JSON."""
df = get_last_year_data(ticker)
kpis = compute_kpis(df, risk_free_rate=RISK_FREE_RATE)
_GLOBAL_HIST["latest"][ticker] = {
"df": df, "kpis": kpis, "retriever": FactRetriever(build_fact_corpus(ticker, kpis))
}
if _have_neo4j_driver():
try: store_to_neo4j(ticker, df)
except Exception: pass
return json.dumps({
"ticker": ticker,
"n_days": kpis["n_days"],
"start_price": kpis["start_price"],
"end_price": kpis["end_price"],
"total_return_pct": kpis["total_return"]*100,
"cagr_pct": kpis["cagr"]*100,
"ann_vol_pct": kpis["ann_vol"]*100,
"sharpe": kpis["sharpe"],
"max_drawdown_pct": kpis["max_drawdown"]*100,
"best_day": kpis["best_day"],
"worst_day": kpis["worst_day"],
})
@tool
def show_monthly_returns(ticker: str) -> str:
"""Return a markdown table of monthly returns (XX.XX%)."""
if ticker not in _GLOBAL_HIST["latest"]:
return "Please run analyze_last_year first."
mt = _GLOBAL_HIST["latest"][ticker]["kpis"]["monthly_table"].copy()
try:
mt.index = pd.to_datetime(mt.index).strftime("%Y-%m")
except Exception:
mt.index = pd.Index([str(x)[:7] for x in mt.index])
mt["Monthly Return"] = (mt["Monthly Return"] * 100.0).map(lambda v: f"{v:.2f}%")
return mt.to_markdown()
@tool
def neo4j_check_latest_close(ticker: str) -> str:
"""Read most recent adj_close for ticker from Neo4j (if enabled)."""
if not _have_neo4j_driver():
return "Neo4j check skipped (ENABLE_NEO4J=False)."
with driver.session() as session:
res = session.run("""
MATCH (s:Stock {symbol:$symbol})-[:HAS_PRICE]->(p:PriceBar)
RETURN p.date AS date, p.adj_close AS adj_close
ORDER BY p.date DESC LIMIT 1
""", symbol=ticker).single()
if not res:
return "Neo4j check: no records yet."
return f"Neo4j latest adj_close for {ticker} on {res['date']}: {float(res['adj_close']):.4f}"
# Safe prettifier (UPDATED: more robust, no regex-in-fstring)
def _prettify_fact_line(line: str) -> str:
s = line.strip()
# Remove any trailing "(score=...)" fragments
s = re.sub(r"\s*\(score=.*?\)\s*$", "", s)
def _as_pct(m, label):
try:
return f"{label}{float(m.group(2))*100:.2f}%"
except Exception:
return m.group(0)
s = re.sub(r"(total return over last 1Y:\s*)([-+]?\d*\.?\d+)", lambda m: _as_pct(m, "Total return (1Y): "), s, flags=re.I)
s = re.sub(r"(CAGR.*?:\s*)([-+]?\d*\.?\d+)", lambda m: _as_pct(m, "CAGR (1Y): "), s, flags=re.I)
s = re.sub(r"(annualized volatility.*?:\s*)([-+]?\d*\.?\d+)",lambda m: _as_pct(m, "Annualized volatility: "), s, flags=re.I)
s = re.sub(r"(max drawdown.*?:\s*)([-+]?\d*\.?\d+)", lambda m: _as_pct(m, "Max drawdown: "), s, flags=re.I)
s = re.sub(r"(Sharpe ratio.*?:\s*)([-+]?\d*\.?\d+)", lambda m: f"Sharpe ratio: {float(m.group(2)):.2f}", s, flags=re.I)
# Best/Worst day β€” rebuild line unconditionally if pattern seen
bm = re.search(r"best day.*?:\s*(\d{4}-\d{2}-\d{2}).*?return\s*([-+]?\d*\.?\d+)", s, flags=re.I)
if bm:
s = re.sub(r"best day.*", f"Best day: {bm.group(1)} (+{float(bm.group(2))*100:.2f}%)", s, flags=re.I)
wm = re.search(r"worst day.*?:\s*(\d{4}-\d{2}-\d{2}).*?return\s*([-+]?\d*\.?\d+)", s, flags=re.I)
if wm:
s = re.sub(r"worst day.*", f"Worst day: {wm.group(1)} ({abs(float(wm.group(2))*100):.2f}% decline)", s, flags=re.I)
# Remove leading "- TICKER" if present
s = re.sub(r"^-\s*[A-Z]{1,6}\s*", "- ", s)
return s
@tool("retrieve_facts")
def retrieve_facts_single(query: str) -> str:
"""INPUT: 'TICKER | question' -> pretty bullets."""
if "|" in query:
ticker, question = [x.strip() for x in query.split("|", 1)]
else:
ticker, question = query.strip(), "performance summary"
if ticker not in _GLOBAL_HIST["latest"]:
return "Please run analyze_last_year first."
hits = _GLOBAL_HIST["latest"][ticker]["retriever"].query(question, top_k=5)
pretty = [_prettify_fact_line(f"- {txt}") for (txt, _score) in hits]
return "\n".join(pretty)
# ---------- LangGraph flow ----------
from langgraph.graph import StateGraph, END
from typing import TypedDict
class HistState(TypedDict, total=False):
ticker: str
analysis_json: str
monthly_md: str
neo4j_line: str
facts_md: str
final_markdown: str
def _fmt2(x):
try: return f"{float(x):.2f}"
except: return "0.00"
def _pros_cons(js):
pros, cons = [], []
tr = float(js.get("total_return_pct",0)); sh = float(js.get("sharpe",0))
vol = float(js.get("ann_vol_pct",0)); mdd = float(js.get("max_drawdown_pct",0))
if tr > 0: pros.append("Positive 1-year total return.")
if sh > 1.0: pros.append("Good risk-adjusted performance (Sharpe > 1).")
if vol < 25.0: pros.append("Moderate volatility profile.")
if abs(mdd) <= 20.0: pros.append("Relatively contained drawdowns.")
if tr <= 0: cons.append("Negative 1-year total return.")
if sh < 0.3: cons.append("Weak risk-adjusted performance (low Sharpe).")
if vol >= 30.0: cons.append("Elevated price volatility.")
if abs(mdd) >= 25.0: cons.append("Deep drawdowns observed.")
if not pros: pros.append("No major positives indicated by last-year metrics.")
if not cons: cons.append("No major cautions indicated by last-year metrics.")
return pros, cons
def n_h_analyze(s: HistState) -> HistState: return {"analysis_json": analyze_last_year.invoke(s["ticker"])}
def n_h_monthly(s: HistState) -> HistState: return {"monthly_md": show_monthly_returns.invoke(s["ticker"])}
def n_h_neo4j(s: HistState) -> HistState: return {"neo4j_line": neo4j_check_latest_close.invoke(s["ticker"])}
def n_h_facts(s: HistState) -> HistState:
q = f"{s['ticker']} | risk-adjusted performance and drawdowns"
return {"facts_md": retrieve_facts_single.invoke(q)}
def n_h_write(s: HistState) -> HistState:
try: k = json.loads(s.get("analysis_json","{}"))
except Exception: k = {}
t = s["ticker"]
tr=_fmt2(k.get("total_return_pct",0)); cg=_fmt2(k.get("cagr_pct",0))
av=_fmt2(k.get("ann_vol_pct",0)); sh=_fmt2(k.get("sharpe",0)); md=_fmt2(k.get("max_drawdown_pct",0))
bd = k.get("best_day",["",0.0]); wd = k.get("worst_day",["",0.0])
bd_d, wd_d = bd[0], wd[0]
bd_r=_fmt2(float(bd[1])*100); wd_r=_fmt2(float(wd[1])*100)
sys = "You are a concise equity analyst who writes clear, neutral summaries."
usr = (f"Write a 2–3 sentence summary for {t} using ONLY: "
f"Return {tr}%, CAGR {cg}%, Vol {av}%, Sharpe {sh}, MaxDD {md}%, "
f"Best {bd_d} (+{bd_r}%), Worst {wd_d} (-{wd_r}%).")
try: summary = chat_summarize(sys, usr)
except Exception:
summary = (f"{t} delivered {tr}% 1Y return (vol {av}%, Sharpe {sh}). "
f"Max drawdown {md}%. Best day {bd_d} (+{bd_r}%), worst {wd_d} (-{wd_r}%).")
pros, cons = _pros_cons(k)
lines = []
lines.append(f"# {t} β€” Last 1Y Analysis")
lines.append(summary)
lines.append("\n## Key Metrics")
lines += [f"- Total Return: {tr}%", f"- CAGR: {cg}%", f"- Annualized Volatility: {av}%",
f"- Sharpe (rf={RISK_FREE_RATE:.2%}): {sh}", f"- Max Drawdown: {md}%",
f"- Best Day: {bd_d} (+{bd_r}%)", f"- Worst Day: {wd_d} (-{wd_r}%)"]
lines.append("\n## Monthly Returns")
lines.append(s.get("monthly_md","_No monthly table._"))
lines.append("\n## Pros"); lines += [f"- {p}" for p in pros]
lines.append("\n## Cons"); lines += [f"- {c}" for c in cons]
lines.append("\n### Data checks")
lines.append(f"- {s.get('neo4j_line','')}")
if s.get("facts_md","").strip():
lines.append("- Facts:"); lines += [ln for ln in s["facts_md"].splitlines()]
lines.append("\n*This is not financial advice.*")
return {"final_markdown": "\n".join(lines)}
wf_h = StateGraph(HistState)
wf_h.add_node("analyze", n_h_analyze); wf_h.add_node("monthly", n_h_monthly)
wf_h.add_node("neo4j", n_h_neo4j); wf_h.add_node("facts", n_h_facts); wf_h.add_node("final", n_h_write)
wf_h.set_entry_point("analyze"); wf_h.add_edge("analyze","monthly"); wf_h.add_edge("monthly","neo4j")
wf_h.add_edge("neo4j","facts"); wf_h.add_edge("facts","final"); wf_h.add_edge("final", END)
hist_agent = wf_h.compile()
# Helper to run by already-resolved ticker (ADDED)
def run_hist_agent_ticker(ticker: str) -> str:
out = hist_agent.invoke({"ticker": ticker})
return out.get("final_markdown","")
def run_hist_agent(user_input: str):
ticker = resolve_to_ticker(user_input)
out = hist_agent.invoke({"ticker": ticker})
return out.get("final_markdown",""), ticker
# Cell 6 β€” News Analysis Agent (FIXED)
from urllib.parse import urlparse
import math, json, re, requests
import pandas as pd
import faiss
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline as hfpipe
# ---- FinBERT for sentiment (shared for Portfolio too) ----
FINBERT_ID = "yiyanghkust/finbert-tone"
tok_snt = AutoTokenizer.from_pretrained(FINBERT_ID, use_fast=True)
mdl_snt = AutoModelForSequenceClassification.from_pretrained(FINBERT_ID, device_map="auto", torch_dtype="auto")
sentiment_pipe = hfpipe("text-classification", model=mdl_snt, tokenizer=tok_snt, top_k=None, truncation=True)
print("FinBERT ready:", FINBERT_ID)
# ---- Fetchers ----
from gnews import GNews
def fetch_news_newsapi(query: str, from_date: str, to_date: str, page_size=100, api_key: str = ""):
if not api_key:
return []
url = "https://newsapi.org/v2/everything"
params = {
"q": query,
"language": "en",
"from": from_date,
"to": to_date,
"sortBy": "publishedAt",
"pageSize": min(page_size, 100),
"apiKey": api_key,
}
try:
r = requests.get(url, params=params, timeout=15)
if r.status_code != 200:
return []
data = r.json()
except Exception:
return []
arts = []
for a in data.get("articles", []):
arts.append({
"title": a.get("title") or "",
"description": a.get("description") or "",
"content": a.get("content") or "",
"source": (a.get("source") or {}).get("name") or "",
"publishedAt": a.get("publishedAt") or "",
"url": a.get("url") or "",
})
return arts
def fetch_news_gnews(query: str, max_results=50):
g = GNews(language='en', country='US', period=f"{NEWS_LOOKBACK_DAYS}d", max_results=max_results)
try:
hits = g.get_news(query)
except Exception:
hits = []
out = []
for h in hits or []:
out.append({
"title": h.get("title") or "",
"description": h.get("description") or "",
"content": "",
"source": (h.get("publisher") or {}).get("title") or "",
"publishedAt": h.get("published date") or "",
"url": h.get("url") or "",
})
return out
def fetch_latest_news(company: str, ticker: str):
to_date = today_utc_date().isoformat()
from_date = days_ago(NEWS_LOOKBACK_DAYS).isoformat()
q = f'"{company}" OR {ticker}'
rows = []
if NEWSAPI_KEY:
rows.extend(fetch_news_newsapi(q, from_date, to_date, page_size=MAX_ARTICLES, api_key=NEWSAPI_KEY))
if not rows:
rows.extend(fetch_news_gnews(f"{company} {ticker}", max_results=MAX_ARTICLES))
if not rows:
return pd.DataFrame(columns=["title","description","content","source","publishedAt","url"])
df = pd.DataFrame(rows).fillna("")
def _to_ts(x):
try:
return pd.to_datetime(x, utc=True)
except Exception:
return pd.NaT
df["publishedAt"] = df["publishedAt"].apply(_to_ts)
df = (
df.dropna(subset=["title","url"])
.drop_duplicates(subset=["url"])
.drop_duplicates(subset=["title"])
.sort_values("publishedAt", ascending=False)
.head(MAX_ARTICLES)
.reset_index(drop=True)
)
return df
# ---- Filters & weights ----
DOMAIN_BLOCKLIST = {
"pypi.org","github.com","medium.com","substack.com","reddit.com",
"applech2.com","macupdate.com","investingideas.com","etfdailynews.com","marketbeat.com","gurufocus.com"
}
DOMAIN_QUALITY = {
"reuters.com": 1.5, "bloomberg.com": 1.5, "ft.com": 1.5, "wsj.com": 1.5, "cnbc.com": 1.4,
"barrons.com": 1.3, "forbes.com": 1.1, "theverge.com": 1.2, "techcrunch.com": 1.2,
"marketwatch.com": 1.0, "investors.com": 1.0, "yahoo.com": 1.0, "seekingalpha.com": 0.7,
}
# Extra disambiguation for tickers that are common words
AMBIGUOUS_TICKERS = {
"SPY": ["spdr", "s&p 500", "spdr s&p 500", "etf", "spdr s&p 500 etf trust", "nysearca:spy"],
}
def _domain(url: str):
try:
d = urlparse(url).netloc.lower()
return d[4:] if d.startswith("www.") else d
except Exception:
return ""
def _mostly_english(s: str) -> bool:
s = (s or "").strip()
if not s:
return True
ascii_ratio = sum(1 for ch in s if ord(ch) < 128) / max(1, len(s))
return ascii_ratio >= 0.85
def _company_keywords(company: str, ticker: str):
toks = re.findall(r"[A-Za-z0-9]+", company or "")
toks = [t for t in toks if len(t) > 2]
toks += [ticker.upper()]
return sorted(set(toks), key=str.lower)
def clean_filter_news(df: pd.DataFrame, company: str, ticker: str) -> pd.DataFrame:
if df.empty:
return df
df = df.copy()
df["domain"] = df["url"].map(_domain)
# Normalize Google News aggregator to original publisher (approx)
mask_g = df["domain"].str.contains("news.google", na=False)
df.loc[mask_g, "domain"] = (
df.loc[mask_g, "source"].fillna("").str.lower().str.replace(r"\s+", "", regex=True)
)
df = df[~df["domain"].isin(DOMAIN_BLOCKLIST)].copy()
kw = _company_keywords(company, ticker)
amb = AMBIGUOUS_TICKERS.get(ticker.upper())
def relevant(row):
text = f"{row.get('title','')} {row.get('description','')}".lower()
if not _mostly_english(text):
return False
if not any(k.lower() in text for k in kw):
return False
if amb and not any(a in text for a in amb):
return False
return True
df = df[df.apply(relevant, axis=1)].copy()
df["source_w"] = df["domain"].map(DOMAIN_QUALITY).fillna(0.9)
def rel_w(row):
text = f"{row.get('title','')} {row.get('description','')}".lower()
has_t = ticker.lower() in text
has_c = any(c.lower() in text for c in _company_keywords(company, ticker) if c.lower() != ticker.lower())
return 1.3 if (has_t and has_c) else (1.1 if (has_t or has_c) else 1.0)
df["rel_w"] = df.apply(rel_w, axis=1)
return df.reset_index(drop=True)
# ---- Sentiment aggregation ----
def sentiment_label_scores(text: str):
if not text.strip():
return "neutral", 0.0, 1.0, 0.0
out = sentiment_pipe(text[:512])[0]
probs = {d["label"].lower(): float(d["score"]) for d in out}
pos = probs.get("positive", 0.0)
neu = probs.get("neutral", 0.0)
neg = probs.get("negative", 0.0)
label = "positive" if pos > max(neu, neg) else ("negative" if neg > max(pos, neu) else "neutral")
return label, pos, neu, neg
def analyze_and_store_news(company: str, ticker: str):
df_raw = fetch_latest_news(company, ticker)
if df_raw.empty:
return {
"ticker": ticker, "company": company, "n_articles": 0,
"overall_label": "unknown", "overall_score": 0.0,
"pos_pct": 0.0, "neu_pct": 0.0, "neg_pct": 0.0, "df": df_raw
}
df = clean_filter_news(df_raw, company, ticker)
if df.empty:
return {
"ticker": ticker, "company": company, "n_articles": 0,
"overall_label": "unknown", "overall_score": 0.0,
"pos_pct": 0.0, "neu_pct": 0.0, "neg_pct": 0.0, "df": df
}
labels, pos_p, neu_p, neg_p, w_rec = [], [], [], [], []
now = pd.Timestamp.utcnow()
for _, r in df.iterrows():
text = (r["title"] + ". " + r.get("description","")).strip()
label, ppos, pneu, pneg = sentiment_label_scores(text)
labels.append(label)
pos_p.append(ppos)
neu_p.append(pneu)
neg_p.append(pneg)
age_days = max(0.0, (now - (r["publishedAt"] or now)).total_seconds() / 86400.0)
w_rec.append(math.exp(-0.25 * age_days))
df["label"] = labels
df["p_pos"] = pos_p
df["p_neu"] = neu_p
df["p_neg"] = neg_p
df["w_recency"] = w_rec
df["w_total"] = df["w_recency"] * df["source_w"] * df["rel_w"]
df["signed"] = df["w_total"] * (df["p_pos"] - df["p_neg"])
denom = df["w_total"].sum() + 1e-9
overall_score = df["signed"].sum() / denom
n = len(df)
pos_pct = (df["label"].eq("positive").sum() / n) * 100.0
neu_pct = (df["label"].eq("neutral").sum() / n) * 100.0
neg_pct = (df["label"].eq("negative").sum() / n) * 100.0
if overall_score > 0.10:
overall_label = "positive"
elif overall_score < -0.10:
overall_label = "negative"
else:
overall_label = "neutral"
if _have_neo4j_driver():
with driver.session() as session:
session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (s:Stock) REQUIRE s.symbol IS UNIQUE;")
session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (a:Article) REQUIRE a.url IS UNIQUE;")
session.run("MERGE (s:Stock {symbol:$s}) SET s.company=$c", s=ticker, c=company)
rows = df.to_dict(orient="records")
session.run(
"""
UNWIND $rows AS r
MERGE (a:Article {url:r.url})
SET a.title=r.title, a.source=r.source, a.publishedAt=toString(r.publishedAt),
a.label=r.label, a.p_pos=r.p_pos, a.p_neu=r.p_neu, a.p_neg=r.p_neg,
a.domain=r.domain, a.source_w=r.source_w, a.rel_w=r.rel_w
WITH a
MATCH (s:Stock {symbol:$s}) MERGE (s)-[:HAS_NEWS]->(a)
""",
rows=rows, s=ticker
)
return {
"ticker": ticker, "company": company, "n_articles": int(n),
"overall_label": overall_label, "overall_score": float(overall_score),
"pos_pct": float(pos_pct), "neu_pct": float(neu_pct), "neg_pct": float(neg_pct),
"df": df,
}
# ---- Retriever for snippets ----
class NewsRetriever:
def __init__(self, docs):
self.docs = docs
if not docs:
self.index = None
return
X = _embedder.encode(docs, convert_to_numpy=True, normalize_embeddings=True, batch_size=32)
self.index = faiss.IndexFlatIP(X.shape[1])
self.index.add(X)
self.X = X
def query(self, q, top_k=8):
if not self.index or not self.docs:
return []
qv = _embedder.encode([q], convert_to_numpy=True, normalize_embeddings=True)
D, I = self.index.search(qv, top_k)
hits = []
for j, i in enumerate(I[0]):
if i == -1:
continue
s = self.docs[i].replace("\n", " ").strip()
if len(s) > 220:
s = s[:217] + "..."
hits.append((s, float(D[0][j])))
return hits
# ---- Tools ----
from langchain.tools import tool
_GLOBAL_NEWS = {"latest": {}}
@tool
def fetch_analyze_news(ticker: str) -> str:
"""Resolve company, fetch & score news, write Neo4j, build retriever; return summary JSON."""
try:
name_candidates = yahoo_symbol_search(ticker)
company = (name_candidates[0].get("longname") or name_candidates[0].get("shortname")) if name_candidates else ticker
except Exception:
company = ticker
out = analyze_and_store_news(company, ticker)
df = out["df"]
docs = [(t + ". " + d).strip() for t, d in zip(df["title"].tolist(), df["description"].tolist())] if not df.empty else []
retriever = NewsRetriever(docs)
_GLOBAL_NEWS["latest"][ticker] = {"summary": out, "df": df, "retriever": retriever}
payload = {k: out[k] for k in ["ticker","company","n_articles","overall_label","overall_score","pos_pct","neu_pct","neg_pct"]}
return json.dumps(payload)
@tool
def show_sentiment_breakdown(ticker: str) -> str:
"""Markdown table of recent headlines (top 12)."""
if ticker not in _GLOBAL_NEWS["latest"]:
return "Run fetch_analyze_news first."
df = _GLOBAL_NEWS["latest"][ticker]["summary"]["df"]
if df.empty:
return "_No recent articles found._"
tbl = df[["publishedAt","domain","source","label","title"]].head(12).copy()
try:
tbl["publishedAt"] = pd.to_datetime(tbl["publishedAt"]).dt.strftime("%Y-%m-%d")
except Exception:
pass
return tbl.to_markdown(index=False)
@tool
def neo4j_check_news_count(ticker: str) -> str:
"""How many articles stored in Neo4j."""
if not _have_neo4j_driver():
return "Neo4j check skipped (ENABLE_NEO4J=False)."
with driver.session() as session:
res = session.run(
"MATCH (:Stock {symbol:$s})-[:HAS_NEWS]->(a:Article) RETURN count(a) AS c",
s=ticker
).single()
c = int(res["c"]) if res else 0
return f"Neo4j has {c} article nodes for {ticker}."
@tool("retrieve_news_evidence")
def retrieve_news_evidence_tool(query: str) -> str:
"""INPUT: 'TICKER | question' -> date Β· domain Β· snippet bullets."""
if "|" in query:
ticker, question = [x.strip() for x in query.split("|", 1)]
else:
ticker, question = query.strip(), "latest sentiment drivers"
if ticker not in _GLOBAL_NEWS["latest"]:
return "Run fetch_analyze_news first."
retriever = _GLOBAL_NEWS["latest"][ticker]["retriever"]
hits = retriever.query(question, top_k=6) if retriever else []
if not hits:
return "_No evidence available._"
# attach meta (date/domain) if we can match
df = _GLOBAL_NEWS["latest"][ticker]["summary"]["df"]
meta = {}
for _, r in df.iterrows():
key = (r["title"] + ". " + r.get("description","")).strip()
meta[key] = {
"date": (pd.to_datetime(r["publishedAt"]).strftime("%Y-%m-%d") if pd.notna(r["publishedAt"]) else ""),
"domain": r.get("domain",""),
}
bullets = []
for txt, _ in hits:
m = meta.get(txt, {})
bullets.append(f"- {m.get('date','')} Β· {m.get('domain','')} Β· {txt}")
return "\n".join(bullets)
# ---- LangGraph flow ----
from langgraph.graph import StateGraph, END
from typing import TypedDict
class NewsState(TypedDict, total=False):
ticker: str
fetch_json: str
breakdown_md: str
neo4j_line: str
evidence_md: str
final_markdown: str
def n_n_fetch(s: NewsState) -> NewsState:
return {"fetch_json": fetch_analyze_news.invoke(s["ticker"])}
def n_n_breakdown(s: NewsState) -> NewsState:
return {"breakdown_md": show_sentiment_breakdown.invoke(s["ticker"])}
def n_n_neo(s: NewsState) -> NewsState:
return {"neo4j_line": neo4j_check_news_count.invoke(s["ticker"])}
def n_n_evidence(s: NewsState) -> NewsState:
q = f"{s['ticker']} | biggest drivers of sentiment"
return {"evidence_md": retrieve_news_evidence_tool.invoke(q)}
def _pros_cons_from_summary(js):
pros, cons = [], []
label = js.get("overall_label","neutral")
pos = float(js.get("pos_pct",0))
neu = float(js.get("neu_pct",0))
neg = float(js.get("neg_pct",0))
if label == "positive":
pros.append("Net positive media tone in the recent period.")
if pos >= 40:
pros.append("High share of positive headlines.")
if neu >= 40:
pros.append("Balanced coverage (many neutral headlines).")
if label == "negative":
cons.append("Net negative media tone in the recent period.")
if neg >= 40:
cons.append("High share of negative headlines.")
if pos <= 20:
cons.append("Few positive headlines recently.")
if not pros:
pros.append("No strong positive skew detected.")
if not cons:
cons.append("No strong negative skew detected.")
return pros, cons
def n_n_write(s: NewsState) -> NewsState:
try:
js = json.loads(s.get("fetch_json","{}"))
except Exception:
js = {}
t = s["ticker"]
label = js.get("overall_label","neutral")
score = float(js.get("overall_score",0.0))
pos = float(js.get("pos_pct",0.0))
neu = float(js.get("neu_pct",0.0))
neg = float(js.get("neg_pct",0.0))
# Safer prompt that does not invent metrics
sys = (
"You are a cautious summarizer. Use ONLY the provided numbers: overall_label, overall_score, "
"pos_pct, neu_pct, neg_pct. Do not invent or reinterpret metrics (e.g., do not call a percent a score), "
"and do not mention returns."
)
usr = (
f"Write a 2–3 sentence summary for {t}. "
f"Overall={label}, Score={score:.2f}, Mix: +{pos:.1f}% / neutral {neu:.1f}% / -{neg:.1f}%."
)
try:
summary = chat_summarize(sys, usr)
except Exception:
summary = (
f"Coverage for {t} appears {label}. "
f"Headline mix: {pos:.1f}% positive, {neu:.1f}% neutral, {neg:.1f}% negative (score {score:.2f})."
)
pros, cons = _pros_cons_from_summary(js)
lines = []
lines.append(f"# {t} β€” Current News Sentiment ({NEWS_LOOKBACK_DAYS}d)")
lines.append(summary)
lines.append("\n## Sentiment Snapshot")
lines.append(f"- **Overall:** {label} (score: {score:.2f})")
lines.append(f"- **Headline mix:** {pos:.1f}% positive Β· {neu:.1f}% neutral Β· {neg:.1f}% negative")
lines.append("\n## Recent Headlines (sample)")
lines.append(s.get("breakdown_md","_No headlines._"))
lines.append("\n## Evidence (semantic matches)")
lines.append(s.get("evidence_md","_No evidence._"))
lines.append("\n## Pros (based on tone)")
lines += [f"- {p}" for p in pros]
lines.append("\n## Cons (based on tone)")
lines += [f"- {c}" for c in cons]
lines.append("\n### Data Checks")
lines.append(f"- {s.get('neo4j_line','')}")
lines.append("\n*This is not financial advice.*")
return {"final_markdown": "\n".join(lines)}
wf_n = StateGraph(NewsState)
wf_n.add_node("fetch", n_n_fetch)
wf_n.add_node("breakdown", n_n_breakdown)
wf_n.add_node("neo", n_n_neo)
wf_n.add_node("evidence", n_n_evidence)
wf_n.add_node("final", n_n_write)
wf_n.set_entry_point("fetch")
wf_n.add_edge("fetch","breakdown")
wf_n.add_edge("breakdown","neo")
wf_n.add_edge("neo","evidence")
wf_n.add_edge("evidence","final")
wf_n.add_edge("final", END)
news_agent = wf_n.compile()
# Helper to run by already-resolved ticker
def run_news_agent_ticker(ticker: str) -> str:
out = news_agent.invoke({"ticker": ticker})
return out.get("final_markdown","")
def run_news_agent(user_input: str):
ticker = resolve_to_ticker(user_input)
out = news_agent.invoke({"ticker": ticker})
return out.get("final_markdown",""), ticker
# Cell 7 β€” Portfolio Optimization Agent
from scipy.optimize import minimize
import yfinance as yf
_P_GLOBAL = {"latest": {}}
CORE_ETFS = ["SPY","VTI","VXUS","BND"]
WMAX = 0.30
MIN_W_SOFT = 0.03
LAMBDA_CONCEN = 0.02
MAX_TICKERS_TOTAL = 30
_STOPWORDS = {"I","A","AN","AND","ARE","AM","AS","AT","BE","BY","CAN","FOR","FROM","HAD","HAS","HAVE","HE","HER",
"HIM","HIS","IF","IN","INTO","IS","IT","ITS","ME","MY","OF","ON","OR","OUR","SO","SHE","THAT","THE","THEIR",
"THEM","THEN","THERE","THESE","THEY","THIS","TO","UP","US","WAS","WE","WITH","YOU","YOUR","FEW","MANY","MOST",
"SOME","ANY","ALL"}
def extract_tickers(text: str):
raw = re.findall(r"\b[A-Z]{1,5}(?:\.[A-Z])?\b", text.upper())
cands = sorted(set(raw))
validated = []
try:
for c in cands:
m = yahoo_symbol_search(c)
if m and any(d["symbol"].upper()==c for d in m):
validated.append(c)
except Exception:
pass
if validated:
return validated[:MAX_TICKERS_TOTAL]
return [c for c in cands if c not in _STOPWORDS][:MAX_TICKERS_TOTAL]
CATEGORY_MAP = {
"megacap tech": ["AAPL","MSFT","GOOGL","AMZN","NVDA","META"],
"semiconductors": ["NVDA","AMD","AVGO","QCOM","TSM","INTC"],
"cloud saas": ["CRM","NOW","ADBE","ORCL","DDOG","SNOW"],
"ai": ["NVDA","MSFT","GOOGL","AMZN","META","AVGO"],
"ev": ["TSLA","RIVN","LCID","NIO","GM","F"],
"banks": ["JPM","BAC","WFC","C","GS","MS"],
"healthcare": ["UNH","JNJ","PFE","MRK","LLY","ABBV"],
"staples": ["PG","KO","PEP","WMT","COST","MDLZ"],
"energy": ["XOM","CVX","COP","SLB","EOG","PSX"],
"industrials": ["CAT","BA","UNP","GE","HON","DE"],
"utilities": ["NEE","DUK","SO","D","AEP","EXC"],
"reit": ["PLD","AMT","CCI","SPG","O","EQIX"],
"broad etf": ["SPY","VTI","QQQ","VOO","VXUS","BND"],
}
def detect_category(text: str):
t = text.lower()
for k in CATEGORY_MAP:
if k in t: return k
if "tech" in t: return "megacap tech"
if "semis" in t: return "semiconductors"
if "staple" in t: return "staples"
return ""
def resolve_input(user_text: str):
tix = extract_tickers(user_text)
cat = detect_category(user_text)
if tix: return sorted(set(tix))[:MAX_TICKERS_TOTAL], cat
if cat: return [], cat
token = re.sub(r"can i invest in|suggest|recommend|stocks|portfolio|optimi[sz]e", "", user_text, flags=re.I).strip()
if token:
m = yahoo_symbol_search(token)
if m: return [m[0]["symbol"]], ""
return [], ""
def fetch_prices(tickers, lookback_days=PRICE_LOOKBACK_DAYS):
if not tickers: return pd.DataFrame()
end = today_utc_date()
start = end - timedelta(days=lookback_days + 10)
try:
batch_raw = yf.download(tickers, start=start.isoformat(), end=end.isoformat(),
auto_adjust=False, group_by="column", progress=False, threads=True)
if isinstance(batch_raw, pd.DataFrame):
adj = batch_raw["Adj Close"] if "Adj Close" in batch_raw.columns else batch_raw["Close"]
if isinstance(adj, pd.Series): adj = adj.to_frame()
df = adj.dropna(how="all").ffill().dropna()
cols = [c for c in tickers if c in df.columns]
df = df[cols]
long_enough = [c for c in df.columns if df[c].dropna().shape[0] >= 60]
df = df[long_enough]
else:
df = pd.DataFrame()
except Exception:
df = pd.DataFrame()
if df.empty or df.shape[1] < 1:
series_map = {}
for t in tickers:
try:
r = yf.download(t, start=start.isoformat(), end=end.isoformat(),
auto_adjust=False, progress=False)
if r.empty: continue
adj = r.get("Adj Close", r.get("Close"))
if adj is None or adj.empty: continue
adj = adj.dropna().ffill()
if adj.shape[0] < 60: continue
series_map[t] = adj
except Exception: continue
if series_map:
df = pd.DataFrame(series_map).dropna(how="all").ffill().dropna()
else:
df = pd.DataFrame()
return df
def compute_risk_metrics(price_df: pd.DataFrame):
if price_df.empty: return {"metrics": pd.DataFrame(), "corr": pd.DataFrame(), "rets": pd.DataFrame()}
rets = price_df.pct_change().dropna()
if rets.empty: return {"metrics": pd.DataFrame(), "corr": pd.DataFrame(), "rets": pd.DataFrame()}
ann_ret = (1 + rets.mean())**252 - 1
ann_vol = rets.std() * np.sqrt(252)
sharpe = (ann_ret - RISK_FREE_RATE) / (ann_vol + 1e-12)
metrics = pd.DataFrame({"AnnReturn%": (ann_ret*100).round(2),
"AnnVol%": (ann_vol*100).round(2),
"Sharpe": sharpe.round(2)}).sort_values("AnnReturn%", ascending=False)
corr = rets.corr()
return {"metrics": metrics, "corr": corr, "rets": rets}
# ---- Ticker-level news sentiment (uses FinBERT we already loaded) ----
def fetch_sentiment_for_ticker(ticker: str):
to_date = today_utc_date().isoformat()
from_date = days_ago(NEWS_LOOKBACK_DAYS).isoformat()
rows = []
if NEWSAPI_KEY:
url = "https://newsapi.org/v2/everything"
params = {"q": ticker, "language":"en", "from":from_date, "to":to_date,
"sortBy":"publishedAt", "pageSize": min(MAX_NEWS_PER_TICKER,100), "apiKey": NEWSAPI_KEY}
try:
r = requests.get(url, params=params, timeout=15)
if r.status_code==200:
data = r.json()
for a in data.get("articles", []):
rows.append({"title": a.get("title") or "", "description": a.get("description") or "",
"source": (a.get("source") or {}).get("name") or "",
"publishedAt": a.get("publishedAt") or "", "url": a.get("url") or ""})
except Exception:
pass
if not rows:
g = GNews(language='en', country='US', period=f"{NEWS_LOOKBACK_DAYS}d", max_results=MAX_NEWS_PER_TICKER)
try:
hits = g.get_news(ticker)
for h in hits or []:
rows.append({"title": h.get("title") or "", "description": h.get("description") or "",
"source": (h.get("publisher") or {}).get("title") or "",
"publishedAt": h.get("published date") or "", "url": h.get("url") or ""})
except Exception: pass
if not rows:
return {"ticker": ticker, "n_articles": 0, "overall_label": "unknown", "overall_score": 0.0, "df": pd.DataFrame()}
df = pd.DataFrame(rows).fillna("")
def _to_ts(x):
try: return pd.to_datetime(x, utc=True)
except: return pd.NaT
df["publishedAt"] = df["publishedAt"].apply(_to_ts)
df = df.dropna(subset=["title","url"]).drop_duplicates(subset=["url"]).drop_duplicates(subset=["title"]).copy()
df = df.sort_values("publishedAt", ascending=False).head(MAX_NEWS_PER_TICKER).reset_index(drop=True)
labels,pos_p,neu_p,neg_p,w = [],[],[],[],[]
now = pd.Timestamp.utcnow()
for _, r in df.iterrows():
text = (r["title"] + ". " + r.get("description","")).strip()
if not text:
label, ppos, pneu, pneg = "neutral", 0.0, 1.0, 0.0
else:
out = sentiment_pipe(text[:512])[0]
probs = {d["label"].lower(): float(d["score"]) for d in out}
ppos, pneu, pneg = probs.get("positive",0.0), probs.get("neutral",0.0), probs.get("negative",0.0)
label = "positive" if ppos>max(pneu,pneg) else ("negative" if pneg>max(ppos,pneu) else "neutral")
age_days = max(0.0, (now - (r["publishedAt"] or now)).total_seconds()/86400.0)
w.append(math.exp(-0.25 * age_days))
labels.append(label); pos_p.append(ppos); neu_p.append(pneu); neg_p.append(pneg)
df["label"]=labels; df["p_pos"]=pos_p; df["p_neu"]=neu_p; df["p_neg"]=neg_p; df["w"]=w
df["signed"] = df["w"] * (df["p_pos"] - df["p_neg"])
score = df["signed"].sum()/(df["w"].sum()+1e-9)
n = len(df)
pos_pct = (df["label"].eq("positive").sum()/n)*100.0
neu_pct = (df["label"].eq("neutral").sum()/n)*100.0
neg_pct = (df["label"].eq("negative").sum()/n)*100.0
label = "positive" if score>0.10 else ("negative" if score<-0.10 else "neutral")
return {"ticker": ticker, "n_articles": n, "overall_label": label, "overall_score": float(score),
"pos_pct": float(pos_pct), "neu_pct": float(neu_pct), "neg_pct": float(neg_pct), "df": df}
# ---- FAISS facts for portfolio evidence ----
class FactRetrieverP:
def __init__(self, facts):
self.facts = facts
if not facts: self.index=None; return
X = _embedder.encode(facts, convert_to_numpy=True, normalize_embeddings=True, batch_size=64)
self.index = faiss.IndexFlatIP(X.shape[1]); self.index.add(X)
def query(self, q, top_k=8):
if not self.index or not self.facts: return []
qv = _embedder.encode([q], convert_to_numpy=True, normalize_embeddings=True)
D, I = self.index.search(qv, top_k)
return [(self.facts[i], float(D[0][j])) for j, i in enumerate(I[0])]
# ---- Neo4j snapshot (optional) ----
def neo4j_store_snapshot(tickers, metrics_df, sentiments):
if not _have_neo4j_driver():
return "Neo4j write skipped (ENABLE_NEO4J=False)."
md = metrics_df.rename(columns={"AnnReturn%":"AnnReturn","AnnVol%":"AnnVol"}).copy()
rows_metrics = md.reset_index().rename(columns={"index":"Ticker"}).to_dict(orient="records")
rows_sent = []
for t, js in sentiments.items():
rows_sent.append({"ticker": t, "label": js.get("overall_label","unknown"),
"score": float(js.get("overall_score",0.0)),
"pos_pct": float(js.get("pos_pct",0.0)),
"neu_pct": float(js.get("neu_pct",0.0)),
"neg_pct": float(js.get("neg_pct",0.0))})
with driver.session() as session:
session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (s:Stock) REQUIRE s.symbol IS UNIQUE")
session.run("""
UNWIND $rows AS r
MERGE (s:Stock {symbol:r.Ticker})
SET s.AnnReturn=toFloat(r.AnnReturn), s.AnnVol=toFloat(r.AnnVol), s.Sharpe=toFloat(r.Sharpe)
""", rows=rows_metrics)
session.run("""
UNWIND $rows AS r
MATCH (s:Stock {symbol:r.ticker})
MERGE (s)-[rel:HAS_SENTIMENT]->(m:Sentiment {date: date()})
SET rel.label=r.label, rel.score=r.score, rel.pos_pct=r.pos_pct, rel.neu_pct=r.neu_pct, rel.neg_pct=r.neg_pct
""", rows=rows_sent)
return f"Wrote {len(rows_metrics)} metric nodes and {len(rows_sent)} sentiment relations."
# ---- Tools ----
from langchain.tools import tool
@tool
def build_universe(input_text: str) -> str:
"""Build the initial security universe from free text.
Input: free-form sentence with tickers and/or a theme (e.g., "optimize AAPL MSFT TSLA" or "semiconductors").
Returns: JSON string {"holdings": [...], "category": "<theme|''>", "universe": [...]}
"""
holdings, category = resolve_input(input_text)
universe = set()
if holdings:
universe.update(holdings); universe.update(CORE_ETFS)
if category: universe.update(CATEGORY_MAP.get(category, []))
elif category:
universe.update(CATEGORY_MAP.get(category, [])); universe.update(CORE_ETFS)
else:
universe.update(CORE_ETFS + ["AAPL","MSFT","NVDA","AMZN"])
universe = sorted(list(universe))[:MAX_TICKERS_TOTAL]
_P_GLOBAL["latest"]["holdings"] = holdings
_P_GLOBAL["latest"]["category"] = category
_P_GLOBAL["latest"]["universe"] = universe
return json.dumps({"holdings": holdings, "category": category, "universe": universe})
def _avg_corr_to_holdings(corr: pd.DataFrame, holding_tix, t):
if not isinstance(corr, pd.DataFrame) or corr.empty or not holding_tix: return np.nan
vals = []
for h in holding_tix:
if (t in corr.index) and (h in corr.columns):
try: vals.append(abs(float(corr.loc[t, h])))
except Exception: pass
return float(np.mean(vals)) if vals else np.nan
@tool
def score_universe(_: str="") -> str:
"""Score the universe by diversification & news tone; compute risk tables and store snapshot.
Uses correlation vs. current holdings and FinBERT news sentiment to rank candidates.
Side effects: stores metrics/sentiment to Neo4j (if enabled).
Returns: JSON string {"n_universe": int, "n_holdings": int, "top_candidates": [...], "neo4j": "<msg>"}
"""
universe = _P_GLOBAL["latest"].get("universe", [])
holdings = _P_GLOBAL["latest"].get("holdings", [])
if not universe: return json.dumps({"error":"empty universe"})
px = fetch_prices(universe, PRICE_LOOKBACK_DAYS)
if px.empty: return json.dumps({"error":"no price data"})
risk = compute_risk_metrics(px)
metrics, corr, rets = risk["metrics"], risk["corr"], risk["rets"]
sentiments = {}
for t in universe:
try: sentiments[t] = fetch_sentiment_for_ticker(t)
except Exception: sentiments[t] = {"ticker": t, "n_articles": 0, "overall_label": "unknown", "overall_score": 0.0}
scores = {}
for t in universe:
avg_corr = _avg_corr_to_holdings(corr, holdings, t)
sent = float(sentiments[t].get("overall_score", 0.0))
scores[t] = 0.6 * (1.0 - (0.0 if np.isnan(avg_corr) else avg_corr)) + 0.4 * ((sent + 1.0) / 2.0)
_P_GLOBAL["latest"].update({"px": px, "metrics": metrics, "corr": corr, "rets": rets,
"sentiments": sentiments, "scores": scores})
facts = []
for t in metrics.index:
r = metrics.loc[t]; s = sentiments[t]
facts.append(f"{t} annual return: {r['AnnReturn%']:.2f}%")
facts.append(f"{t} annual volatility: {r['AnnVol%']:.2f}%")
facts.append(f"{t} Sharpe ratio: {r['Sharpe']:.2f}")
facts.append(f"{t} news sentiment score (recent): {s.get('overall_score',0.0):.3f} label {s.get('overall_label','unknown')}")
_P_GLOBAL["latest"]["retriever"] = FactRetrieverP(facts)
universe_ranked = sorted(universe, key=lambda x: scores.get(x,0.0), reverse=True)
extras = [t for t in universe_ranked if t not in holdings]
need = max(5, 8 - len(holdings)) if len(holdings)==0 else max(0, 8 - len(holdings))
recs = extras[:need]
neo_msg = neo4j_store_snapshot(universe, metrics, sentiments)
payload = {"n_universe": len(universe), "n_holdings": len(holdings),
"top_candidates": recs, "neo4j": neo_msg}
return json.dumps(payload)
def _mean_var_opt(rets_df: pd.DataFrame, risk_free=RISK_FREE_RATE, wmax=WMAX, lambda_conc=LAMBDA_CONCEN):
R = rets_df.values
if R.shape[0] < 40: raise RuntimeError("Too little data for optimization.")
mu = np.mean(R, axis=0) * 252.0
Sigma = np.cov(R, rowvar=False) * 252.0
Sigma = Sigma + np.eye(Sigma.shape[0]) * 1e-6
N = len(mu)
x0 = np.ones(N)/N
def neg_sharpe(w):
vol = np.sqrt(max(1e-12, w @ Sigma @ w))
ret = w @ mu
return - (ret - risk_free) / vol
def objective(w): return neg_sharpe(w) + lambda_conc * np.sum(w**2)
min_w = MIN_W_SOFT if (N * MIN_W_SOFT) < 1.0 else 0.0
bounds = [(min_w, wmax)] * N
cons = [{"type":"eq","fun": lambda w: np.sum(w) - 1.0}]
res = minimize(objective, x0, method="SLSQP", bounds=bounds, constraints=cons,
options={"maxiter":700,"ftol":1e-9,"disp":False})
if (not res.success) or (np.any(np.isnan(res.x))):
raise RuntimeError("SLSQP failed.")
w = res.x
w[w < 1e-3] = 0.0; w = w / (w.sum() + 1e-12)
vol = float(np.sqrt(max(1e-12, w @ Sigma @ w)))
ret = float(w @ mu)
sharpe = (ret - risk_free) / (vol + 1e-12)
return w, ret, vol, sharpe
@tool
def optimize_portfolio(objective: str="max_sharpe") -> str:
"""Optimize portfolio weights (max Sharpe with caps & soft-min weights).
Uses mean-variance with per-asset cap (default 30%) and light concentration penalty.
Returns: Markdown table with weights (%) keyed by ticker.
"""
holdings = _P_GLOBAL["latest"].get("holdings", [])
scores = _P_GLOBAL["latest"].get("scores", {})
px = _P_GLOBAL["latest"].get("px", pd.DataFrame())
if px.empty: return "_No data for optimization._"
ranked = sorted(scores, key=lambda t: scores[t], reverse=True)
chosen = list(holdings)
for t in ranked:
if t not in chosen: chosen.append(t)
if len(chosen) >= min(12, len(ranked)): break
tickers = [t for t in chosen if t in px.columns]
sub_px = px[tickers].dropna()
if sub_px.empty: return "_No overlapping price history._"
rets = sub_px.pct_change().dropna()
try:
w, ann_ret, ann_vol, sharpe = _mean_var_opt(rets)
weights = pd.Series(w, index=tickers)
_P_GLOBAL["latest"]["weights"] = dict(zip(tickers, weights.tolist()))
_P_GLOBAL["latest"]["opt_summary"] = {"AnnReturn%": ann_ret*100, "AnnVol%": ann_vol*100, "Sharpe": sharpe}
tbl = (weights*100).round(2).astype(str) + "%"
return tbl.sort_values(ascending=False).to_frame("Weight").to_markdown()
except Exception:
iv = 1.0 / (rets.std() + 1e-9)
w = iv / iv.sum()
w = np.minimum(w, WMAX); w = w / w.sum()
_P_GLOBAL["latest"]["weights"] = {t: float(w[t]) for t in w.index}
tbl = (w*100).round(2).astype(str) + "%"
return tbl.sort_values(ascending=False).to_frame("Weight").to_markdown()
@tool
def show_metrics_table(_: str="") -> str:
"""Return a per-ticker risk & tone table.
Columns: AnnReturn%, AnnVol%, Sharpe, SentScore, SentLabel. Markdown formatted.
"""
metrics = _P_GLOBAL["latest"].get("metrics", pd.DataFrame()).copy()
sentiments = _P_GLOBAL["latest"].get("sentiments", {})
if metrics.empty: return "_No metrics available._"
metrics["SentScore"] = [float(sentiments.get(t, {}).get("overall_score", 0.0)) for t in metrics.index]
metrics["SentLabel"] = [sentiments.get(t, {}).get("overall_label", "unknown") for t in metrics.index]
return metrics[["AnnReturn%","AnnVol%","Sharpe","SentScore","SentLabel"]].to_markdown()
@tool("retrieve_evidence")
def retrieve_evidence_tool(query: str) -> str:
"""Retrieve semantic facts collected during scoring to justify suggestions."""
retr = _P_GLOBAL["latest"].get("retriever", None)
if not retr: return "_No facts available._"
hits = retr.query(query, top_k=8)
return "\n".join([f"- {txt}" for txt, _ in hits]) if hits else "_No facts available._"
# ---- LangGraph flow ----
from langgraph.graph import StateGraph, END
from typing import TypedDict
class PortState(TypedDict, total=False):
user_text: str
universe_json: str
score_json: str
weights_md: str
metrics_md: str
evidence_md: str
final_md: str
def n_p_uni(s: PortState) -> PortState: return {"universe_json": build_universe.invoke(s["user_text"])}
def n_p_score(s: PortState) -> PortState: return {"score_json": score_universe.invoke("")}
def n_p_opt(s: PortState) -> PortState: return {"weights_md": optimize_portfolio.invoke("max_sharpe")}
def n_p_metrics(s: PortState) -> PortState: return {"metrics_md": show_metrics_table.invoke("")}
def n_p_evid(s: PortState) -> PortState: return {"evidence_md": retrieve_evidence_tool.invoke("diversification and risk drivers")}
def _corr_bucket(x: float) -> str:
if np.isnan(x): return "unknown"
if x < 0.30: return "low"
if x < 0.60: return "medium"
return "high"
def n_p_write(s: PortState) -> PortState:
try: uni = json.loads(s.get("universe_json","{}"))
except: uni = {}
try: summ = json.loads(s.get("score_json","{}"))
except: summ = {}
holdings = uni.get("holdings", []) or []
recs = summ.get("top_candidates", []) or []
corr = _P_GLOBAL["latest"].get("corr", pd.DataFrame())
sentiments = _P_GLOBAL["latest"].get("sentiments", {})
rows = []
for t in recs:
avgc = _avg_corr_to_holdings(corr, holdings, t)
snt = sentiments.get(t, {})
rows.append({"Ticker": t,
"AvgAbsCorrToHoldings": (None if np.isnan(avgc) else round(avgc,2)),
"CorrBucket": _corr_bucket(avgc),
"SentLabel": snt.get("overall_label","unknown"),
"SentScore": round(float(snt.get("overall_score",0.0)),2)})
df_add = pd.DataFrame(rows)
if df_add.empty:
summary = ("No strong additions identified from the current universe. "
"Consider widening the universe or relaxing constraints to unlock diversification options.")
else:
order = {"low":0,"medium":1,"high":2,"unknown":3}
df_rank = df_add.sort_values(by=["CorrBucket","SentScore"],
key=lambda col: col.map(order) if col.name=="CorrBucket" else col,
ascending=[True, False])
top_names = df_rank["Ticker"].tolist()[:3]
low_n = (df_add["CorrBucket"]=="low").sum(); med_n = (df_add["CorrBucket"]=="medium").sum()
pos_n = (df_add["SentLabel"]=="positive").sum(); neg_n = (df_add["SentLabel"]=="negative").sum()
s1 = f"Suggested additions to complement {', '.join(holdings) if holdings else 'your portfolio'}: {', '.join(recs)}."
s2 = f"These tilt toward lower correlation (low={low_n}, medium={med_n}); top low-corr picks: {', '.join(top_names) if top_names else 'β€”'}."
s3 = f"Recent news tone for additions skews {('positive' if pos_n>=neg_n else 'mixed')} (pos={pos_n}, neg={neg_n})."
summary = s1 + " " + s2 + " " + s3
opt = _P_GLOBAL["latest"].get("opt_summary", {})
perf_line = f"\n**Opt. Stats** β€” Ann. Return: {opt.get('AnnReturn%',0):.2f}% Β· Ann. Vol: {opt.get('AnnVol%',0):.2f}% Β· Sharpe: {opt.get('Sharpe',0):.2f}" if opt else ""
lines = []
lines.append("# Portfolio Optimization β€” Suggestions & Risk Analysis")
lines.append(summary + perf_line)
lines.append("\n## Recommended Additions")
lines.append("- " + ", ".join(recs) if recs else "_No strong additions identified._")
lines.append("\n## Optimized Weights (cap 30%)")
lines.append(s.get("weights_md","_No optimization result._"))
lines.append("\n## Per-Ticker Risk & Sentiment")
lines.append(s.get("metrics_md","_No metrics._"))
lines.append("\n## Evidence (facts retrieved)")
lines.append(s.get("evidence_md","_No facts available._"))
lines.append("\n### Data Checks")
lines.append("- Neo4j snapshot written." if _have_neo4j_driver() else "- Neo4j write skipped (disabled).")
lines.append("\n*This is not financial advice.*")
return {"final_md": "\n".join(lines)}
wf_p = StateGraph(PortState)
wf_p.add_node("universe", n_p_uni); wf_p.add_node("score", n_p_score)
wf_p.add_node("opt", n_p_opt); wf_p.add_node("metrics", n_p_metrics)
wf_p.add_node("evidence", n_p_evid); wf_p.add_node("write", n_p_write)
wf_p.set_entry_point("universe"); wf_p.add_edge("universe","score"); wf_p.add_edge("score","opt")
wf_p.add_edge("opt","metrics"); wf_p.add_edge("metrics","evidence"); wf_p.add_edge("evidence","write")
wf_p.add_edge("write", END)
port_agent = wf_p.compile()
def run_port_agent(user_text: str):
out = port_agent.invoke({"user_text": user_text})
return out.get("final_md","")
# Cell 8 β€” Supervisor: route or consolidate (UPDATED)
def _looks_like_single_ticker(text: str) -> bool:
t = _clean_text(text)
toks = re.findall(r"[A-Z]{1,6}", t.upper())
return len(toks) == 1 and len(t.strip().split()) <= 4
def _intent_router(user_text: str) -> str:
t = (_clean_text(user_text)).lower()
if any(k in t for k in ["optimize","weight","weights","allocation","diversify","portfolio","rebalance"]):
return "portfolio"
if any(k in t for k in ["news","headline","sentiment","media","press","article"]):
return "news"
if any(k in t for k in ["trend","historical","drawdown","sharpe","volatility","last year","1y","price history"]):
return "historical"
# default behavior: single name/ticker -> consolidated
if _looks_like_single_ticker(user_text) or len(_clean_text(user_text).split()) <= 4:
return "consolidated"
return "consolidated"
def supervisor_respond(user_text: str) -> str:
intent = _intent_router(user_text)
try:
if intent == "historical":
md, tk = run_hist_agent(user_text)
return f"## Supervisor β€” Routed to Historical ({tk})\n\n{md}"
elif intent == "news":
md, tk = run_news_agent(user_text)
return f"## Supervisor β€” Routed to News ({tk})\n\n{md}"
elif intent == "portfolio":
if _looks_like_single_ticker(user_text):
tkr = resolve_to_ticker(user_text)
user_text = f"I have invested in {tkr}. Suggest a few stocks to diversify my portfolio."
md = run_port_agent(user_text)
return f"## Supervisor β€” Routed to Portfolio\n\n{md}"
else: # consolidated
tk = resolve_to_ticker(user_text)
hist_md, _ = run_hist_agent(tk)
news_md, _ = run_news_agent(tk)
port_prompt = f"I have invested in {tk}. Suggest a few stocks to diversify my portfolio."
port_md = run_port_agent(port_prompt)
return (
f"# Consolidated View for {tk}\n"
f"\n---\n\n{hist_md}\n\n---\n\n{news_md}\n\n---\n\n{port_md}"
)
except Exception as e:
return f"**Supervisor error:** {e}"
# Cell 9 β€” Gradio app (single box; supervisor decides)
import gradio as gr
APP_DESC = """Type a ticker (e.g., **AAPL**) for a consolidated view (Historical β†’ News β†’ Portfolio),
or ask specifically for **news**, **historical trends**, or **portfolio optimization** and the supervisor will route it."""
def chat_fn(message, history):
return supervisor_respond(message)
demo = gr.ChatInterface(
fn=chat_fn,
title="πŸ“Š Multi-Agent Equity Analyst (Historical + News + Portfolio)",
description=APP_DESC,
textbox=gr.Textbox(placeholder="e.g., AAPL | 'news on MSFT' | 'optimize my portfolio AAPL MSFT TSLA'"),
cache_examples=False
)
demo.launch()
######################################################################################################################################
# === Minimal Offline Evaluation (20 tests) β€” Only 3 "Very Good" Metrics & Suppressed Warnings ===
import re, time, numpy as np, pandas as pd, warnings, logging
# Silence warnings and common noisy loggers
warnings.filterwarnings("ignore")
for name in ["yfinance", "neo4j", "neo4j.notifications", "neo4j.security", "neo4j.io", "urllib3"]:
try:
lg = logging.getLogger(name)
lg.setLevel(logging.CRITICAL)
lg.propagate = False
lg.disabled = True
except Exception:
pass
# ---------- helpers (same logic; trimmed outputs) ----------
def _parse_route_and_ticker(md: str):
first = md.strip().splitlines()[0] if md.strip() else ""
route, ticker = "unknown", ""
if first.startswith("## Supervisor β€” Routed to Historical"):
route = "historical"; m = re.search(r"\(([^)]+)\)", first); ticker = (m.group(1) if m else "")
elif first.startswith("## Supervisor β€” Routed to News"):
route = "news"; m = re.search(r"\(([^)]+)\)", first); ticker = (m.group(1) if m else "")
elif first.startswith("## Supervisor β€” Routed to Portfolio"):
route = "portfolio"
elif first.startswith("# Consolidated View for"):
route = "consolidated"; m = re.search(r"# Consolidated View for\s+([A-Z]{1,6})", first); ticker = (m.group(1) if m else "")
if not ticker:
m = re.search(r"#\s+([A-Z]{1,6})\s+β€”\s+Last 1Y Analysis", md)
if m: ticker = m.group(1)
if not ticker:
m = re.search(r"#\s+([A-Z]{1,6})\s+β€”\s+Current News Sentiment", md)
if m: ticker = m.group(1)
return route, ticker
def _extract_kpis_from_md(md: str):
pats = {
"Total Return": r"Total Return:\s*([-+]?\d+(?:\.\d+)?)\s*%",
"CAGR": r"CAGR:\s*([-+]?\d+(?:\.\d+)?)\s*%",
"Annualized Volatility": r"Annualized Volatility:\s*([-+]?\d+(?:\.\d+)?)\s*%",
"Sharpe": r"Sharpe\s*\(.*?\):\s*([-+]?\d+(?:\.\d+)?)",
"Max Drawdown": r"Max Drawdown:\s*([-+]?\d+(?:\.\d+)?)\s*%",
}
out = {}
for k, p in pats.items():
m = re.search(p, md, flags=re.I)
if m: out[k] = float(m.group(1))
return out
def _numeric_targets_for(ticker: str):
df = get_last_year_data(ticker)
k = compute_kpis(df, risk_free_rate=RISK_FREE_RATE)
return {
"Total Return": k["total_return"] * 100.0,
"CAGR": k["cagr"] * 100.0,
"Annualized Volatility": k["ann_vol"] * 100.0,
"Sharpe": k["sharpe"],
"Max Drawdown": k["max_drawdown"] * 100.0,
}
def _mape_percent_metrics(pred: dict, targ: dict):
keys = sorted(set(pred) & set(targ))
if not keys: return np.nan
rel_errs = []
for k in keys:
if k == "Sharpe": # exclude non-% metric from MAPE
continue
p, t = float(pred[k]), float(targ[k])
denom = max(1e-6, abs(t))
rel_errs.append(abs(p - t) / denom)
return (100.0 * float(np.mean(rel_errs))) if rel_errs else np.nan
def _section(md: str, title: str):
m = re.search(rf"##\s*{re.escape(title)}(.*?)(?=\n##\s|\Z)", md, flags=re.S)
return m.group(1).strip() if m else ""
def _extract_weights_from_md(md: str):
sec = _section(md, "Optimized Weights")
if not sec: return {}
pairs = re.findall(r"\n\|?\s*([A-Z][A-Z.\-]{0,6})\s*\|\s*([\d.]+)\s*%", sec) or \
re.findall(r"\n([A-Z][A-Z.\-]{0,6})\s+([\d.]+)\s*%", sec)
out = {}
for t, v in pairs:
try: out[t] = float(v) / 100.0
except Exception: pass
return out
def _portfolio_sanity(weights: dict, wmax=0.30, tol=0.005):
if not weights: return False
s_ok = abs(sum(weights.values()) - 1.0) <= tol
cap_ok = all((w <= wmax + 1e-9) for w in weights.values())
return bool(s_ok and cap_ok)
# ---------- 20-test suite ----------
TESTS = [
# Consolidated (single tickers)
{"prompt": "AAPL", "expect_intent": "consolidated"},
{"prompt": "NVDA", "expect_intent": "consolidated"},
{"prompt": "GOOGL", "expect_intent": "consolidated"},
{"prompt": "AMZN", "expect_intent": "consolidated"},
{"prompt": "META", "expect_intent": "consolidated"},
# News (kept for routing quality, we are not reporting news metric)
{"prompt": "news for MSFT", "expect_intent": "news"},
{"prompt": "news for TSLA", "expect_intent": "news"},
{"prompt": "news on AAPL", "expect_intent": "news"},
{"prompt": "latest headlines for NVDA", "expect_intent": "news"},
{"prompt": "news about AMZN", "expect_intent": "news"},
# Portfolio optimization / diversification
{"prompt": "optimize portfolio AAPL MSFT TSLA", "expect_intent": "portfolio"},
{"prompt": "rebalance portfolio NVDA AMD AVGO", "expect_intent": "portfolio"},
{"prompt": "diversify my portfolio META GOOGL AMZN", "expect_intent": "portfolio"},
{"prompt": "weights for SPY VTI VXUS BND", "expect_intent": "portfolio"},
{"prompt": "optimize holdings JPM BAC WFC", "expect_intent": "portfolio"},
# # Historical queries
# {"prompt": "what is the volatility for NVDA last year", "expect_intent": "historical"},
# {"prompt": "drawdown for AAPL last year", "expect_intent": "historical"},
# {"prompt": "1y trend for MSFT", "expect_intent": "historical"},
# {"prompt": "sharpe of AMZN last year", "expect_intent": "historical"},
# {"prompt": "historical analysis of META", "expect_intent": "historical"},
]
# ---------- run & report ONLY the 3 best metrics ----------
route_hits, kpi_mapes, port_passes = [], [], []
for t in TESTS:
expect = t["expect_intent"]
md = supervisor_respond(t["prompt"]) # uses your agents
route, tk = _parse_route_and_ticker(md)
# 1) Routing accuracy
route_hits.append(int(route == expect))
# 2) KPI MAPE (only when we have a single ticker route that prints KPIs)
mape = np.nan
if tk and route in ("historical", "consolidated"):
try:
pred = _extract_kpis_from_md(md)
targ = _numeric_targets_for(tk)
mape = _mape_percent_metrics(pred, targ)
except Exception:
mape = np.nan
kpi_mapes.append(mape)
# 3) Portfolio sanity (only for portfolio/consolidated routes)
if route in ("portfolio", "consolidated"):
weights = _extract_weights_from_md(md)
port_passes.append(int(_portfolio_sanity(weights)))
else:
port_passes.append(np.nan)
routing_accuracy = round(100.0 * (np.nanmean(route_hits) if route_hits else 0.0), 1)
kpi_mape_mean = (None if not np.isfinite(np.nanmean(kpi_mapes)) else round(np.nanmean(kpi_mapes), 3))
port_pass_rate = (None if not np.isfinite(np.nanmean(port_passes)) else round(100.0 * np.nanmean(port_passes), 1))
summary_3 = {
"routing_accuracy_%": routing_accuracy,
"kpi_mape_mean_%": kpi_mape_mean,
"portfolio_sanity_pass_rate_%": port_pass_rate,
}
# Print ONLY the 3 metrics
for k, v in summary_3.items():
print(f"{k}: {v}")
# === Minimal Offline Evaluation (20 tests) β€” 4 Final Metrics incl. Latency; Suppress Warnings/Logs ===
import re, time, numpy as np, pandas as pd, warnings, logging, contextlib, io, sys
# Silence Python warnings and common noisy loggers
warnings.filterwarnings("ignore")
for name in ["yfinance", "neo4j", "neo4j.notifications", "neo4j.security", "neo4j.io", "urllib3"]:
try:
lg = logging.getLogger(name)
lg.setLevel(logging.CRITICAL)
lg.propagate = False
lg.disabled = True
except Exception:
pass
# Helper to suppress stray prints from libraries during calls
@contextlib.contextmanager
def _quiet_io():
stdout, stderr = sys.stdout, sys.stderr
try:
sys.stdout, sys.stderr = io.StringIO(), io.StringIO()
yield
finally:
sys.stdout, sys.stderr = stdout, stderr
# ---------- helpers (reuse your app's behavior; no extra outputs) ----------
def _parse_route_and_ticker(md: str):
first = md.strip().splitlines()[0] if md.strip() else ""
route, ticker = "unknown", ""
if first.startswith("## Supervisor β€” Routed to Historical"):
route = "historical"; m = re.search(r"\(([^)]+)\)", first); ticker = (m.group(1) if m else "")
elif first.startswith("## Supervisor β€” Routed to News"):
route = "news"; m = re.search(r"\(([^)]+)\)", first); ticker = (m.group(1) if m else "")
elif first.startswith("## Supervisor β€” Routed to Portfolio"):
route = "portfolio"
elif first.startswith("# Consolidated View for"):
route = "consolidated"; m = re.search(r"# Consolidated View for\s+([A-Z]{1,6})", first); ticker = (m.group(1) if m else "")
if not ticker:
m = re.search(r"#\s+([A-Z]{1,6})\s+β€”\s+Last 1Y Analysis", md)
if m: ticker = m.group(1)
if not ticker:
m = re.search(r"#\s+([A-Z]{1,6})\s+β€”\s+Current News Sentiment", md)
if m: ticker = m.group(1)
return route, ticker
def _extract_kpis_from_md(md: str):
pats = {
"Total Return": r"Total Return:\s*([-+]?\d+(?:\.\d+)?)\s*%",
"CAGR": r"CAGR:\s*([-+]?\d+(?:\.\d+)?)\s*%",
"Annualized Volatility": r"Annualized Volatility:\s*([-+]?\d+(?:\.\d+)?)\s*%",
"Max Drawdown": r"Max Drawdown:\s*([-+]?\d+(?:\.\d+)?)\s*%",
"Sharpe": r"Sharpe\s*\(.*?\):\s*([-+]?\d+(?:\.\d+)?)", # excluded from MAPE but parsed for completeness
}
out = {}
for k, p in pats.items():
m = re.search(p, md, flags=re.I)
if m: out[k] = float(m.group(1))
return out
def _numeric_targets_for(ticker: str):
df = get_last_year_data(ticker)
k = compute_kpis(df, risk_free_rate=RISK_FREE_RATE)
return {
"Total Return": k["total_return"] * 100.0,
"CAGR": k["cagr"] * 100.0,
"Annualized Volatility": k["ann_vol"] * 100.0,
"Max Drawdown": k["max_drawdown"] * 100.0,
# Sharpe not used in MAPE, so we don't need it here
}
def _mape_percent_metrics(pred: dict, targ: dict):
keys = sorted(set(pred) & set(targ))
if not keys: return np.nan
rel_errs = []
for k in keys:
p, t = float(pred[k]), float(targ[k])
denom = max(1e-6, abs(t))
rel_errs.append(abs(p - t) / denom)
return (100.0 * float(np.mean(rel_errs))) if rel_errs else np.nan
def _section(md: str, title: str):
m = re.search(rf"##\s*{re.escape(title)}(.*?)(?=\n##\s|\Z)", md, flags=re.S)
return m.group(1).strip() if m else ""
def _extract_weights_from_md(md: str):
sec = _section(md, "Optimized Weights")
if not sec: return {}
pairs = re.findall(r"\n\|?\s*([A-Z][A-Z.\-]{0,6})\s*\|\s*([\d.]+)\s*%", sec) or \
re.findall(r"\n([A-Z][A-Z.\-]{0,6})\s+([\d.]+)\s*%", sec)
out = {}
for t, v in pairs:
try: out[t] = float(v) / 100.0
except Exception: pass
return out
def _portfolio_sanity(weights: dict, wmax=0.30, tol=0.005):
if not weights: return False
s_ok = abs(sum(weights.values()) - 1.0) <= tol
cap_ok = all((w <= wmax + 1e-9) for w in weights.values())
return bool(s_ok and cap_ok)
# ---------- 20-test suite ----------
TESTS = [
# Consolidated (single tickers)
{"prompt": "AAPL", "expect_intent": "consolidated"},
{"prompt": "NVDA", "expect_intent": "consolidated"},
{"prompt": "GOOGL", "expect_intent": "consolidated"},
{"prompt": "AMZN", "expect_intent": "consolidated"},
{"prompt": "META", "expect_intent": "consolidated"},
# News (kept for routing quality; latency measured too)
{"prompt": "news for MSFT", "expect_intent": "news"},
{"prompt": "news for TSLA", "expect_intent": "news"},
{"prompt": "news on AAPL", "expect_intent": "news"},
{"prompt": "latest headlines for NVDA", "expect_intent": "news"},
{"prompt": "news about AMZN", "expect_intent": "news"},
# Portfolio optimization / diversification
{"prompt": "optimize portfolio AAPL MSFT TSLA", "expect_intent": "portfolio"},
{"prompt": "rebalance portfolio NVDA AMD AVGO", "expect_intent": "portfolio"},
{"prompt": "diversify my portfolio META GOOGL AMZN", "expect_intent": "portfolio"},
{"prompt": "weights for SPY VTI VXUS BND", "expect_intent": "portfolio"},
{"prompt": "optimize holdings JPM BAC WFC", "expect_intent": "portfolio"},
]
# ---------- run & compute exactly 4 final metrics (routing, KPI MAPE, portfolio sanity, latency) ----------
route_hits, kpi_mapes, port_passes, latencies = [], [], [], []
for t in TESTS:
expect = t["expect_intent"]
with _quiet_io(): # suppress noisy prints/warnings during one inference
t0 = time.time()
md = supervisor_respond(t["prompt"]) # uses your agents
dt = time.time() - t0
latencies.append(dt)
route, tk = _parse_route_and_ticker(md)
route_hits.append(int(route == expect))
# KPI MAPE (only for routes that actually show KPIs for a single ticker)
mape = np.nan
if tk and route in ("historical", "consolidated"):
try:
pred = _extract_kpis_from_md(md)
targ = _numeric_targets_for(tk)
mape = _mape_percent_metrics(pred, targ)
except Exception:
mape = np.nan
kpi_mapes.append(mape)
# Portfolio sanity (only for portfolio/consolidated routes)
if route in ("portfolio", "consolidated"):
weights = _extract_weights_from_md(md)
port_passes.append(int(_portfolio_sanity(weights)))
else:
port_passes.append(np.nan)
routing_accuracy = round(100.0 * (np.nanmean(route_hits) if route_hits else 0.0), 1)
kpi_mape_mean = (None if not np.isfinite(np.nanmean(kpi_mapes)) else round(np.nanmean(kpi_mapes), 3))
port_pass_rate = (None if not np.isfinite(np.nanmean(port_passes)) else round(100.0 * np.nanmean(port_passes), 1))
lat_p50 = (None if not latencies else round(float(np.percentile(latencies, 50)), 3))
lat_p95 = (None if not latencies else round(float(np.percentile(latencies, 95)), 3))
# Print ONLY the 4 metrics (latency reported as a single metric with p50/p95)
print(f"routing_accuracy_%: {routing_accuracy}")
print(f"kpi_mape_mean_%: {kpi_mape_mean}")
print(f"portfolio_sanity_pass_rate_%: {port_pass_rate}")
print(f"latency_s: p50={lat_p50}, p95={lat_p95}")