# -*- coding: utf-8 -*- """v1_Multi_Agent.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1Whj6LVa2_xvNcS8JyXToLCNOBwx6wn7K """ # # === Cell 1: Setup & Installs === # import gc, os, sys # _ = [gc.collect() for _ in range(3)] # # Core libs (quiet) # !pip -q install yfinance requests pandas numpy matplotlib tqdm \ # langchain langchain-community langgraph transformers accelerate \ # faiss-cpu sentence-transformers gnews neo4j scipy tabulate gradio # # Quiet Transformers logs # os.environ["TRANSFORMERS_VERBOSITY"] = "error" # try: # from transformers.utils import logging as hf_logging # hf_logging.set_verbosity_error() # except Exception: # pass # print("✅ Environment ready.") # Cell 2 — Config & Globals import os, gc, math, json, re, time, requests, numpy as np, pandas as pd from datetime import datetime, timedelta, timezone # Clean memory a bit when re-running _ = [gc.collect() for _ in range(3)] pd.set_option("display.max_columns", None) # ---- Keys / URIs ---- NEWSAPI_KEY = "866bf47e4ad34118af6634a1020bce96" # your key (NewsAPI.org) NEO4J_URI = "neo4j+s://82fe4549.databases.neo4j.io" NEO4J_USER = "neo4j" NEO4J_PASSWORD = "CZMkO1HLvPhDf3mjzw71szMeGAfRSAw9BaTcZpHpaGs" ENABLE_NEO4J = True # << TURNED ON # ---- Constants ---- RISK_FREE_RATE = 0.03 NEWS_LOOKBACK_DAYS = 14 PRICE_LOOKBACK_DAYS = 365 * 2 MAX_ARTICLES = 60 MAX_NEWS_PER_TICKER = 30 EMBED_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" # ---- Device/logging tweaks ---- os.environ["TRANSFORMERS_VERBOSITY"] = "error" try: from transformers.utils import logging as hf_logging hf_logging.set_verbosity_error() except Exception: pass def today_utc_date(): return datetime.now(timezone.utc).date() def days_ago(n): return today_utc_date() - timedelta(days=n) # Cell 3 — Common helpers: symbol resolve, embedder, Neo4j driver (UPDATED) import yfinance as yf import unicodedata, string, re, requests def _clean_text(q: str) -> str: """Normalize and strip invisible Unicode so 'AAPL' always parses.""" if q is None: return "" q = unicodedata.normalize("NFKC", str(q)) # Remove common zero-width / directional marks for ch in ("\u200b", "\u200c", "\u200d", "\u200e", "\u200f", "\u202a", "\u202b", "\u202c", "\u202d", "\u202e"): q = q.replace(ch, "") # Keep only printable characters q = "".join(ch for ch in q if ch.isprintable()) return q.strip() def parse_user_query(q: str): q = _clean_text(q) q_up = q.upper() # Exact ticker like AAPL, TSLA, SPY (letters only, up to 6) if re.fullmatch(r"[A-Z]{1,6}", q_up): return q_up, "maybe_ticker" # Grab the first contiguous A–Z token up to 6 chars (ignore word boundaries) hits = re.findall(r"[A-Z]{1,6}", q_up) if hits: return hits[0], "maybe_ticker" # Otherwise treat as a name to search name = re.sub(r"(what.*price of|can i invest in|stock price of|suggest|recommend|optimi[sz]e)", "", q, flags=re.I).strip(" ?") return (name if name else q), "maybe_name" def yahoo_symbol_search(query: str, exchanges=None): """Yahoo search without over-filtering exchange names (Yahoo returns 'NasdaqGS', 'NMS', etc.).""" url = "https://query2.finance.yahoo.com/v1/finance/search" params = {"q": query, "quotesCount": 10, "newsCount": 0} try: r = requests.get(url, params=params, timeout=10) r.raise_for_status() data = r.json() except Exception: return [] out = [] for q in data.get("quotes", []): qtype = q.get("quoteType") if qtype in ("EQUITY", "ETF", "MUTUALFUND", "INDEX", "CRYPTOCURRENCY"): out.append({ "symbol": q.get("symbol"), "shortname": q.get("shortname"), "longname": q.get("longname"), "exchange": q.get("exchange"), }) return out def resolve_to_ticker(user_text: str): token, kind = parse_user_query(user_text) if kind == "maybe_ticker" and token: return token matches = yahoo_symbol_search(token) if matches: return matches[0]["symbol"] # Last resort: pick first A–Z run hits = re.findall(r"[A-Z]{1,6}", (token or "").upper()) if hits: return hits[0] raise ValueError(f"Could not resolve '{user_text}' to a ticker.") # ---- Shared embedder (one per runtime) from sentence_transformers import SentenceTransformer _embedder = SentenceTransformer(EMBED_MODEL_NAME, device="cpu") # CPU is fine for MiniLM # ---- Neo4j driver (one per runtime) from neo4j import GraphDatabase driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) if ENABLE_NEO4J else None def _have_neo4j_driver(): return ENABLE_NEO4J and (driver is not None) # Cell 4 — Shared TinyLlama chat writer for short summaries from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline LLM_ID = "TinyLlama/TinyLlama-1.1B-Chat-v1.0" tok = AutoTokenizer.from_pretrained(LLM_ID, use_fast=True) llm_model = AutoModelForCausalLM.from_pretrained(LLM_ID, device_map="auto", torch_dtype="auto") gen_pipe = pipeline("text-generation", model=llm_model, tokenizer=tok, max_new_tokens=600, do_sample=False) def chat_summarize(system_msg: str, user_msg: str) -> str: chat = [{"role":"system","content":system_msg},{"role":"user","content":user_msg}] try: prompt = tok.apply_chat_template(chat, tokenize=False, add_generation_prompt=True) out = gen_pipe(prompt, return_full_text=False)[0]["generated_text"].strip() return out except Exception: out = gen_pipe(user_msg, return_full_text=False)[0]["generated_text"].strip() return out # Cell 5 — Historical Trends Agent (UPDATED) import matplotlib.pyplot as plt import faiss from langchain.tools import tool import numpy as np, pandas as pd, math, json, re from datetime import timedelta # ---------- Data & KPIs ---------- def get_last_year_data(ticker: str): end = today_utc_date() start = end - timedelta(days=365) df = yf.download(ticker, start=str(start), end=str(end + timedelta(days=1)), auto_adjust=False, progress=False) if df.empty: raise ValueError(f"No data for {ticker}.") df = df[["Open","High","Low","Close","Adj Close","Volume"]].copy() df.index = pd.to_datetime(df.index) return df def _to_scalar(x): try: return float(getattr(x, "item", lambda: x)()) except Exception: try: return float(x) except Exception: return float(np.asarray(x).reshape(-1)[0]) def compute_kpis(df: pd.DataFrame, risk_free_rate=RISK_FREE_RATE): adj = df["Adj Close"] if isinstance(adj, pd.DataFrame) and adj.shape[1]==1: adj = adj.squeeze("columns") adj = pd.to_numeric(adj, errors="coerce").dropna() n = int(adj.shape[0]) if n < 2: raise ValueError("Not enough data points to compute KPIs.") start_price = _to_scalar(adj.iloc[0]) end_price = _to_scalar(adj.iloc[-1]) total_return = (end_price / start_price) - 1.0 rets = adj.pct_change().dropna() cagr = (1.0 + total_return) ** (252.0 / n) - 1.0 ann_vol = _to_scalar(rets.std()) * np.sqrt(252.0) sharpe = (cagr - risk_free_rate) / (ann_vol + 1e-12) cum_max = adj.cummax() drawdowns = adj / cum_max - 1.0 max_dd = _to_scalar(drawdowns.min()) bd_dt = rets.idxmax(); wd_dt = rets.idxmin() def _fmt_date(d): try: return pd.to_datetime(d).strftime("%Y-%m-%d") except Exception: return str(d) best_day = (_fmt_date(bd_dt), _to_scalar(rets.max())) worst_day = (_fmt_date(wd_dt), _to_scalar(rets.min())) monthly = adj.resample("ME").last().pct_change().dropna() return { "start_price": float(start_price), "end_price": float(end_price), "total_return": float(total_return), "cagr": float(cagr), "ann_vol": float(ann_vol), "sharpe": float(sharpe), "max_drawdown": float(max_dd), "best_day": (best_day[0], float(best_day[1])), "worst_day": (worst_day[0], float(worst_day[1])), "monthly_table": monthly.to_frame("Monthly Return"), "n_days": int(n), } # ---------- Neo4j write ---------- def store_to_neo4j(ticker: str, df: pd.DataFrame): if not _have_neo4j_driver(): return info = yf.Ticker(ticker).info or {} name = info.get("shortName") or info.get("longName") or ticker exchange = info.get("exchange") or info.get("fullExchangeName") or "UNKNOWN" rows = [] for d, r in df.iterrows(): rows.append({ "id": f"{ticker}_{d.date().isoformat()}", "date": d.date().isoformat(), "open": float(r["Open"]), "high": float(r["High"]), "low": float(r["Low"]), "close": float(r["Close"]), "adj_close": float(r["Adj Close"]), "volume": int(r["Volume"]) if not np.isnan(r["Volume"]) else 0, }) with driver.session() as session: session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (s:Stock) REQUIRE s.symbol IS UNIQUE;") session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (p:PriceBar) REQUIRE p.id IS UNIQUE;") session.run("""MERGE (s:Stock {symbol:$symbol}) SET s.name=$name, s.exchange=$ex""", symbol=ticker, name=name, ex=exchange) chunk = 250 for i in range(0, len(rows), chunk): session.run(""" UNWIND $rows AS r MERGE (p:PriceBar {id:r.id}) SET p.date=r.date, p.open=r.open, p.high=r.high, p.low=r.low, p.close=r.close, p.adj_close=r.adj_close, p.volume=r.volume WITH p MATCH (s:Stock {symbol:$symbol}) MERGE (s)-[:HAS_PRICE]->(p) """, rows=rows[i:i+chunk], symbol=ticker) # ---------- Facts / Retriever ---------- def build_fact_corpus(ticker: str, kpis: dict): f = [] f.append(f"{ticker} total return over last 1Y: {kpis['total_return']:.6f}") f.append(f"{ticker} CAGR (last 1Y approximated): {kpis['cagr']:.6f}") f.append(f"{ticker} annualized volatility (1Y): {kpis['ann_vol']:.6f}") f.append(f"{ticker} Sharpe ratio (rf={RISK_FREE_RATE:.2%}): {kpis['sharpe']:.4f}") f.append(f"{ticker} max drawdown (1Y): {kpis['max_drawdown']:.6f}") f.append(f"{ticker} best day (1Y): {kpis['best_day'][0]} return {kpis['best_day'][1]:.6f}") f.append(f"{ticker} worst day (1Y): {kpis['worst_day'][0]} return {kpis['worst_day'][1]:.6f}") f.append(f"{ticker} start price (Adj Close): {kpis['start_price']:.6f}") f.append(f"{ticker} end price (Adj Close): {kpis['end_price']:.6f}") f.append(f"{ticker} period days counted: {kpis['n_days']}") return f class FactRetriever: def __init__(self, sentences): self.sentences = sentences X = _embedder.encode(sentences, convert_to_numpy=True, normalize_embeddings=True) self.index = faiss.IndexFlatIP(X.shape[1]) self.index.add(X) def query(self, q, top_k=5): qv = _embedder.encode([q], convert_to_numpy=True, normalize_embeddings=True) D, I = self.index.search(qv, top_k) return [(self.sentences[i], float(D[0][j])) for j, i in enumerate(I[0])] # ---------- Tools (LangChain) ---------- _GLOBAL_HIST = {"latest": {}} @tool def analyze_last_year(ticker: str) -> str: """Fetch last 1Y OHLCV, compute KPIs, build retriever, write Neo4j, return compact JSON.""" df = get_last_year_data(ticker) kpis = compute_kpis(df, risk_free_rate=RISK_FREE_RATE) _GLOBAL_HIST["latest"][ticker] = { "df": df, "kpis": kpis, "retriever": FactRetriever(build_fact_corpus(ticker, kpis)) } if _have_neo4j_driver(): try: store_to_neo4j(ticker, df) except Exception: pass return json.dumps({ "ticker": ticker, "n_days": kpis["n_days"], "start_price": kpis["start_price"], "end_price": kpis["end_price"], "total_return_pct": kpis["total_return"]*100, "cagr_pct": kpis["cagr"]*100, "ann_vol_pct": kpis["ann_vol"]*100, "sharpe": kpis["sharpe"], "max_drawdown_pct": kpis["max_drawdown"]*100, "best_day": kpis["best_day"], "worst_day": kpis["worst_day"], }) @tool def show_monthly_returns(ticker: str) -> str: """Return a markdown table of monthly returns (XX.XX%).""" if ticker not in _GLOBAL_HIST["latest"]: return "Please run analyze_last_year first." mt = _GLOBAL_HIST["latest"][ticker]["kpis"]["monthly_table"].copy() try: mt.index = pd.to_datetime(mt.index).strftime("%Y-%m") except Exception: mt.index = pd.Index([str(x)[:7] for x in mt.index]) mt["Monthly Return"] = (mt["Monthly Return"] * 100.0).map(lambda v: f"{v:.2f}%") return mt.to_markdown() @tool def neo4j_check_latest_close(ticker: str) -> str: """Read most recent adj_close for ticker from Neo4j (if enabled).""" if not _have_neo4j_driver(): return "Neo4j check skipped (ENABLE_NEO4J=False)." with driver.session() as session: res = session.run(""" MATCH (s:Stock {symbol:$symbol})-[:HAS_PRICE]->(p:PriceBar) RETURN p.date AS date, p.adj_close AS adj_close ORDER BY p.date DESC LIMIT 1 """, symbol=ticker).single() if not res: return "Neo4j check: no records yet." return f"Neo4j latest adj_close for {ticker} on {res['date']}: {float(res['adj_close']):.4f}" # Safe prettifier (UPDATED: more robust, no regex-in-fstring) def _prettify_fact_line(line: str) -> str: s = line.strip() # Remove any trailing "(score=...)" fragments s = re.sub(r"\s*\(score=.*?\)\s*$", "", s) def _as_pct(m, label): try: return f"{label}{float(m.group(2))*100:.2f}%" except Exception: return m.group(0) s = re.sub(r"(total return over last 1Y:\s*)([-+]?\d*\.?\d+)", lambda m: _as_pct(m, "Total return (1Y): "), s, flags=re.I) s = re.sub(r"(CAGR.*?:\s*)([-+]?\d*\.?\d+)", lambda m: _as_pct(m, "CAGR (1Y): "), s, flags=re.I) s = re.sub(r"(annualized volatility.*?:\s*)([-+]?\d*\.?\d+)",lambda m: _as_pct(m, "Annualized volatility: "), s, flags=re.I) s = re.sub(r"(max drawdown.*?:\s*)([-+]?\d*\.?\d+)", lambda m: _as_pct(m, "Max drawdown: "), s, flags=re.I) s = re.sub(r"(Sharpe ratio.*?:\s*)([-+]?\d*\.?\d+)", lambda m: f"Sharpe ratio: {float(m.group(2)):.2f}", s, flags=re.I) # Best/Worst day — rebuild line unconditionally if pattern seen bm = re.search(r"best day.*?:\s*(\d{4}-\d{2}-\d{2}).*?return\s*([-+]?\d*\.?\d+)", s, flags=re.I) if bm: s = re.sub(r"best day.*", f"Best day: {bm.group(1)} (+{float(bm.group(2))*100:.2f}%)", s, flags=re.I) wm = re.search(r"worst day.*?:\s*(\d{4}-\d{2}-\d{2}).*?return\s*([-+]?\d*\.?\d+)", s, flags=re.I) if wm: s = re.sub(r"worst day.*", f"Worst day: {wm.group(1)} ({abs(float(wm.group(2))*100):.2f}% decline)", s, flags=re.I) # Remove leading "- TICKER" if present s = re.sub(r"^-\s*[A-Z]{1,6}\s*", "- ", s) return s @tool("retrieve_facts") def retrieve_facts_single(query: str) -> str: """INPUT: 'TICKER | question' -> pretty bullets.""" if "|" in query: ticker, question = [x.strip() for x in query.split("|", 1)] else: ticker, question = query.strip(), "performance summary" if ticker not in _GLOBAL_HIST["latest"]: return "Please run analyze_last_year first." hits = _GLOBAL_HIST["latest"][ticker]["retriever"].query(question, top_k=5) pretty = [_prettify_fact_line(f"- {txt}") for (txt, _score) in hits] return "\n".join(pretty) # ---------- LangGraph flow ---------- from langgraph.graph import StateGraph, END from typing import TypedDict class HistState(TypedDict, total=False): ticker: str analysis_json: str monthly_md: str neo4j_line: str facts_md: str final_markdown: str def _fmt2(x): try: return f"{float(x):.2f}" except: return "0.00" def _pros_cons(js): pros, cons = [], [] tr = float(js.get("total_return_pct",0)); sh = float(js.get("sharpe",0)) vol = float(js.get("ann_vol_pct",0)); mdd = float(js.get("max_drawdown_pct",0)) if tr > 0: pros.append("Positive 1-year total return.") if sh > 1.0: pros.append("Good risk-adjusted performance (Sharpe > 1).") if vol < 25.0: pros.append("Moderate volatility profile.") if abs(mdd) <= 20.0: pros.append("Relatively contained drawdowns.") if tr <= 0: cons.append("Negative 1-year total return.") if sh < 0.3: cons.append("Weak risk-adjusted performance (low Sharpe).") if vol >= 30.0: cons.append("Elevated price volatility.") if abs(mdd) >= 25.0: cons.append("Deep drawdowns observed.") if not pros: pros.append("No major positives indicated by last-year metrics.") if not cons: cons.append("No major cautions indicated by last-year metrics.") return pros, cons def n_h_analyze(s: HistState) -> HistState: return {"analysis_json": analyze_last_year.invoke(s["ticker"])} def n_h_monthly(s: HistState) -> HistState: return {"monthly_md": show_monthly_returns.invoke(s["ticker"])} def n_h_neo4j(s: HistState) -> HistState: return {"neo4j_line": neo4j_check_latest_close.invoke(s["ticker"])} def n_h_facts(s: HistState) -> HistState: q = f"{s['ticker']} | risk-adjusted performance and drawdowns" return {"facts_md": retrieve_facts_single.invoke(q)} def n_h_write(s: HistState) -> HistState: try: k = json.loads(s.get("analysis_json","{}")) except Exception: k = {} t = s["ticker"] tr=_fmt2(k.get("total_return_pct",0)); cg=_fmt2(k.get("cagr_pct",0)) av=_fmt2(k.get("ann_vol_pct",0)); sh=_fmt2(k.get("sharpe",0)); md=_fmt2(k.get("max_drawdown_pct",0)) bd = k.get("best_day",["",0.0]); wd = k.get("worst_day",["",0.0]) bd_d, wd_d = bd[0], wd[0] bd_r=_fmt2(float(bd[1])*100); wd_r=_fmt2(float(wd[1])*100) sys = "You are a concise equity analyst who writes clear, neutral summaries." usr = (f"Write a 2–3 sentence summary for {t} using ONLY: " f"Return {tr}%, CAGR {cg}%, Vol {av}%, Sharpe {sh}, MaxDD {md}%, " f"Best {bd_d} (+{bd_r}%), Worst {wd_d} (-{wd_r}%).") try: summary = chat_summarize(sys, usr) except Exception: summary = (f"{t} delivered {tr}% 1Y return (vol {av}%, Sharpe {sh}). " f"Max drawdown {md}%. Best day {bd_d} (+{bd_r}%), worst {wd_d} (-{wd_r}%).") pros, cons = _pros_cons(k) lines = [] lines.append(f"# {t} — Last 1Y Analysis") lines.append(summary) lines.append("\n## Key Metrics") lines += [f"- Total Return: {tr}%", f"- CAGR: {cg}%", f"- Annualized Volatility: {av}%", f"- Sharpe (rf={RISK_FREE_RATE:.2%}): {sh}", f"- Max Drawdown: {md}%", f"- Best Day: {bd_d} (+{bd_r}%)", f"- Worst Day: {wd_d} (-{wd_r}%)"] lines.append("\n## Monthly Returns") lines.append(s.get("monthly_md","_No monthly table._")) lines.append("\n## Pros"); lines += [f"- {p}" for p in pros] lines.append("\n## Cons"); lines += [f"- {c}" for c in cons] lines.append("\n### Data checks") lines.append(f"- {s.get('neo4j_line','')}") if s.get("facts_md","").strip(): lines.append("- Facts:"); lines += [ln for ln in s["facts_md"].splitlines()] lines.append("\n*This is not financial advice.*") return {"final_markdown": "\n".join(lines)} wf_h = StateGraph(HistState) wf_h.add_node("analyze", n_h_analyze); wf_h.add_node("monthly", n_h_monthly) wf_h.add_node("neo4j", n_h_neo4j); wf_h.add_node("facts", n_h_facts); wf_h.add_node("final", n_h_write) wf_h.set_entry_point("analyze"); wf_h.add_edge("analyze","monthly"); wf_h.add_edge("monthly","neo4j") wf_h.add_edge("neo4j","facts"); wf_h.add_edge("facts","final"); wf_h.add_edge("final", END) hist_agent = wf_h.compile() # Helper to run by already-resolved ticker (ADDED) def run_hist_agent_ticker(ticker: str) -> str: out = hist_agent.invoke({"ticker": ticker}) return out.get("final_markdown","") def run_hist_agent(user_input: str): ticker = resolve_to_ticker(user_input) out = hist_agent.invoke({"ticker": ticker}) return out.get("final_markdown",""), ticker # Cell 6 — News Analysis Agent (FIXED) from urllib.parse import urlparse import math, json, re, requests import pandas as pd import faiss import torch from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline as hfpipe # ---- FinBERT for sentiment (shared for Portfolio too) ---- FINBERT_ID = "yiyanghkust/finbert-tone" tok_snt = AutoTokenizer.from_pretrained(FINBERT_ID, use_fast=True) mdl_snt = AutoModelForSequenceClassification.from_pretrained(FINBERT_ID, device_map="auto", torch_dtype="auto") sentiment_pipe = hfpipe("text-classification", model=mdl_snt, tokenizer=tok_snt, top_k=None, truncation=True) print("FinBERT ready:", FINBERT_ID) # ---- Fetchers ---- from gnews import GNews def fetch_news_newsapi(query: str, from_date: str, to_date: str, page_size=100, api_key: str = ""): if not api_key: return [] url = "https://newsapi.org/v2/everything" params = { "q": query, "language": "en", "from": from_date, "to": to_date, "sortBy": "publishedAt", "pageSize": min(page_size, 100), "apiKey": api_key, } try: r = requests.get(url, params=params, timeout=15) if r.status_code != 200: return [] data = r.json() except Exception: return [] arts = [] for a in data.get("articles", []): arts.append({ "title": a.get("title") or "", "description": a.get("description") or "", "content": a.get("content") or "", "source": (a.get("source") or {}).get("name") or "", "publishedAt": a.get("publishedAt") or "", "url": a.get("url") or "", }) return arts def fetch_news_gnews(query: str, max_results=50): g = GNews(language='en', country='US', period=f"{NEWS_LOOKBACK_DAYS}d", max_results=max_results) try: hits = g.get_news(query) except Exception: hits = [] out = [] for h in hits or []: out.append({ "title": h.get("title") or "", "description": h.get("description") or "", "content": "", "source": (h.get("publisher") or {}).get("title") or "", "publishedAt": h.get("published date") or "", "url": h.get("url") or "", }) return out def fetch_latest_news(company: str, ticker: str): to_date = today_utc_date().isoformat() from_date = days_ago(NEWS_LOOKBACK_DAYS).isoformat() q = f'"{company}" OR {ticker}' rows = [] if NEWSAPI_KEY: rows.extend(fetch_news_newsapi(q, from_date, to_date, page_size=MAX_ARTICLES, api_key=NEWSAPI_KEY)) if not rows: rows.extend(fetch_news_gnews(f"{company} {ticker}", max_results=MAX_ARTICLES)) if not rows: return pd.DataFrame(columns=["title","description","content","source","publishedAt","url"]) df = pd.DataFrame(rows).fillna("") def _to_ts(x): try: return pd.to_datetime(x, utc=True) except Exception: return pd.NaT df["publishedAt"] = df["publishedAt"].apply(_to_ts) df = ( df.dropna(subset=["title","url"]) .drop_duplicates(subset=["url"]) .drop_duplicates(subset=["title"]) .sort_values("publishedAt", ascending=False) .head(MAX_ARTICLES) .reset_index(drop=True) ) return df # ---- Filters & weights ---- DOMAIN_BLOCKLIST = { "pypi.org","github.com","medium.com","substack.com","reddit.com", "applech2.com","macupdate.com","investingideas.com","etfdailynews.com","marketbeat.com","gurufocus.com" } DOMAIN_QUALITY = { "reuters.com": 1.5, "bloomberg.com": 1.5, "ft.com": 1.5, "wsj.com": 1.5, "cnbc.com": 1.4, "barrons.com": 1.3, "forbes.com": 1.1, "theverge.com": 1.2, "techcrunch.com": 1.2, "marketwatch.com": 1.0, "investors.com": 1.0, "yahoo.com": 1.0, "seekingalpha.com": 0.7, } # Extra disambiguation for tickers that are common words AMBIGUOUS_TICKERS = { "SPY": ["spdr", "s&p 500", "spdr s&p 500", "etf", "spdr s&p 500 etf trust", "nysearca:spy"], } def _domain(url: str): try: d = urlparse(url).netloc.lower() return d[4:] if d.startswith("www.") else d except Exception: return "" def _mostly_english(s: str) -> bool: s = (s or "").strip() if not s: return True ascii_ratio = sum(1 for ch in s if ord(ch) < 128) / max(1, len(s)) return ascii_ratio >= 0.85 def _company_keywords(company: str, ticker: str): toks = re.findall(r"[A-Za-z0-9]+", company or "") toks = [t for t in toks if len(t) > 2] toks += [ticker.upper()] return sorted(set(toks), key=str.lower) def clean_filter_news(df: pd.DataFrame, company: str, ticker: str) -> pd.DataFrame: if df.empty: return df df = df.copy() df["domain"] = df["url"].map(_domain) # Normalize Google News aggregator to original publisher (approx) mask_g = df["domain"].str.contains("news.google", na=False) df.loc[mask_g, "domain"] = ( df.loc[mask_g, "source"].fillna("").str.lower().str.replace(r"\s+", "", regex=True) ) df = df[~df["domain"].isin(DOMAIN_BLOCKLIST)].copy() kw = _company_keywords(company, ticker) amb = AMBIGUOUS_TICKERS.get(ticker.upper()) def relevant(row): text = f"{row.get('title','')} {row.get('description','')}".lower() if not _mostly_english(text): return False if not any(k.lower() in text for k in kw): return False if amb and not any(a in text for a in amb): return False return True df = df[df.apply(relevant, axis=1)].copy() df["source_w"] = df["domain"].map(DOMAIN_QUALITY).fillna(0.9) def rel_w(row): text = f"{row.get('title','')} {row.get('description','')}".lower() has_t = ticker.lower() in text has_c = any(c.lower() in text for c in _company_keywords(company, ticker) if c.lower() != ticker.lower()) return 1.3 if (has_t and has_c) else (1.1 if (has_t or has_c) else 1.0) df["rel_w"] = df.apply(rel_w, axis=1) return df.reset_index(drop=True) # ---- Sentiment aggregation ---- def sentiment_label_scores(text: str): if not text.strip(): return "neutral", 0.0, 1.0, 0.0 out = sentiment_pipe(text[:512])[0] probs = {d["label"].lower(): float(d["score"]) for d in out} pos = probs.get("positive", 0.0) neu = probs.get("neutral", 0.0) neg = probs.get("negative", 0.0) label = "positive" if pos > max(neu, neg) else ("negative" if neg > max(pos, neu) else "neutral") return label, pos, neu, neg def analyze_and_store_news(company: str, ticker: str): df_raw = fetch_latest_news(company, ticker) if df_raw.empty: return { "ticker": ticker, "company": company, "n_articles": 0, "overall_label": "unknown", "overall_score": 0.0, "pos_pct": 0.0, "neu_pct": 0.0, "neg_pct": 0.0, "df": df_raw } df = clean_filter_news(df_raw, company, ticker) if df.empty: return { "ticker": ticker, "company": company, "n_articles": 0, "overall_label": "unknown", "overall_score": 0.0, "pos_pct": 0.0, "neu_pct": 0.0, "neg_pct": 0.0, "df": df } labels, pos_p, neu_p, neg_p, w_rec = [], [], [], [], [] now = pd.Timestamp.utcnow() for _, r in df.iterrows(): text = (r["title"] + ". " + r.get("description","")).strip() label, ppos, pneu, pneg = sentiment_label_scores(text) labels.append(label) pos_p.append(ppos) neu_p.append(pneu) neg_p.append(pneg) age_days = max(0.0, (now - (r["publishedAt"] or now)).total_seconds() / 86400.0) w_rec.append(math.exp(-0.25 * age_days)) df["label"] = labels df["p_pos"] = pos_p df["p_neu"] = neu_p df["p_neg"] = neg_p df["w_recency"] = w_rec df["w_total"] = df["w_recency"] * df["source_w"] * df["rel_w"] df["signed"] = df["w_total"] * (df["p_pos"] - df["p_neg"]) denom = df["w_total"].sum() + 1e-9 overall_score = df["signed"].sum() / denom n = len(df) pos_pct = (df["label"].eq("positive").sum() / n) * 100.0 neu_pct = (df["label"].eq("neutral").sum() / n) * 100.0 neg_pct = (df["label"].eq("negative").sum() / n) * 100.0 if overall_score > 0.10: overall_label = "positive" elif overall_score < -0.10: overall_label = "negative" else: overall_label = "neutral" if _have_neo4j_driver(): with driver.session() as session: session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (s:Stock) REQUIRE s.symbol IS UNIQUE;") session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (a:Article) REQUIRE a.url IS UNIQUE;") session.run("MERGE (s:Stock {symbol:$s}) SET s.company=$c", s=ticker, c=company) rows = df.to_dict(orient="records") session.run( """ UNWIND $rows AS r MERGE (a:Article {url:r.url}) SET a.title=r.title, a.source=r.source, a.publishedAt=toString(r.publishedAt), a.label=r.label, a.p_pos=r.p_pos, a.p_neu=r.p_neu, a.p_neg=r.p_neg, a.domain=r.domain, a.source_w=r.source_w, a.rel_w=r.rel_w WITH a MATCH (s:Stock {symbol:$s}) MERGE (s)-[:HAS_NEWS]->(a) """, rows=rows, s=ticker ) return { "ticker": ticker, "company": company, "n_articles": int(n), "overall_label": overall_label, "overall_score": float(overall_score), "pos_pct": float(pos_pct), "neu_pct": float(neu_pct), "neg_pct": float(neg_pct), "df": df, } # ---- Retriever for snippets ---- class NewsRetriever: def __init__(self, docs): self.docs = docs if not docs: self.index = None return X = _embedder.encode(docs, convert_to_numpy=True, normalize_embeddings=True, batch_size=32) self.index = faiss.IndexFlatIP(X.shape[1]) self.index.add(X) self.X = X def query(self, q, top_k=8): if not self.index or not self.docs: return [] qv = _embedder.encode([q], convert_to_numpy=True, normalize_embeddings=True) D, I = self.index.search(qv, top_k) hits = [] for j, i in enumerate(I[0]): if i == -1: continue s = self.docs[i].replace("\n", " ").strip() if len(s) > 220: s = s[:217] + "..." hits.append((s, float(D[0][j]))) return hits # ---- Tools ---- from langchain.tools import tool _GLOBAL_NEWS = {"latest": {}} @tool def fetch_analyze_news(ticker: str) -> str: """Resolve company, fetch & score news, write Neo4j, build retriever; return summary JSON.""" try: name_candidates = yahoo_symbol_search(ticker) company = (name_candidates[0].get("longname") or name_candidates[0].get("shortname")) if name_candidates else ticker except Exception: company = ticker out = analyze_and_store_news(company, ticker) df = out["df"] docs = [(t + ". " + d).strip() for t, d in zip(df["title"].tolist(), df["description"].tolist())] if not df.empty else [] retriever = NewsRetriever(docs) _GLOBAL_NEWS["latest"][ticker] = {"summary": out, "df": df, "retriever": retriever} payload = {k: out[k] for k in ["ticker","company","n_articles","overall_label","overall_score","pos_pct","neu_pct","neg_pct"]} return json.dumps(payload) @tool def show_sentiment_breakdown(ticker: str) -> str: """Markdown table of recent headlines (top 12).""" if ticker not in _GLOBAL_NEWS["latest"]: return "Run fetch_analyze_news first." df = _GLOBAL_NEWS["latest"][ticker]["summary"]["df"] if df.empty: return "_No recent articles found._" tbl = df[["publishedAt","domain","source","label","title"]].head(12).copy() try: tbl["publishedAt"] = pd.to_datetime(tbl["publishedAt"]).dt.strftime("%Y-%m-%d") except Exception: pass return tbl.to_markdown(index=False) @tool def neo4j_check_news_count(ticker: str) -> str: """How many articles stored in Neo4j.""" if not _have_neo4j_driver(): return "Neo4j check skipped (ENABLE_NEO4J=False)." with driver.session() as session: res = session.run( "MATCH (:Stock {symbol:$s})-[:HAS_NEWS]->(a:Article) RETURN count(a) AS c", s=ticker ).single() c = int(res["c"]) if res else 0 return f"Neo4j has {c} article nodes for {ticker}." @tool("retrieve_news_evidence") def retrieve_news_evidence_tool(query: str) -> str: """INPUT: 'TICKER | question' -> date · domain · snippet bullets.""" if "|" in query: ticker, question = [x.strip() for x in query.split("|", 1)] else: ticker, question = query.strip(), "latest sentiment drivers" if ticker not in _GLOBAL_NEWS["latest"]: return "Run fetch_analyze_news first." retriever = _GLOBAL_NEWS["latest"][ticker]["retriever"] hits = retriever.query(question, top_k=6) if retriever else [] if not hits: return "_No evidence available._" # attach meta (date/domain) if we can match df = _GLOBAL_NEWS["latest"][ticker]["summary"]["df"] meta = {} for _, r in df.iterrows(): key = (r["title"] + ". " + r.get("description","")).strip() meta[key] = { "date": (pd.to_datetime(r["publishedAt"]).strftime("%Y-%m-%d") if pd.notna(r["publishedAt"]) else ""), "domain": r.get("domain",""), } bullets = [] for txt, _ in hits: m = meta.get(txt, {}) bullets.append(f"- {m.get('date','')} · {m.get('domain','')} · {txt}") return "\n".join(bullets) # ---- LangGraph flow ---- from langgraph.graph import StateGraph, END from typing import TypedDict class NewsState(TypedDict, total=False): ticker: str fetch_json: str breakdown_md: str neo4j_line: str evidence_md: str final_markdown: str def n_n_fetch(s: NewsState) -> NewsState: return {"fetch_json": fetch_analyze_news.invoke(s["ticker"])} def n_n_breakdown(s: NewsState) -> NewsState: return {"breakdown_md": show_sentiment_breakdown.invoke(s["ticker"])} def n_n_neo(s: NewsState) -> NewsState: return {"neo4j_line": neo4j_check_news_count.invoke(s["ticker"])} def n_n_evidence(s: NewsState) -> NewsState: q = f"{s['ticker']} | biggest drivers of sentiment" return {"evidence_md": retrieve_news_evidence_tool.invoke(q)} def _pros_cons_from_summary(js): pros, cons = [], [] label = js.get("overall_label","neutral") pos = float(js.get("pos_pct",0)) neu = float(js.get("neu_pct",0)) neg = float(js.get("neg_pct",0)) if label == "positive": pros.append("Net positive media tone in the recent period.") if pos >= 40: pros.append("High share of positive headlines.") if neu >= 40: pros.append("Balanced coverage (many neutral headlines).") if label == "negative": cons.append("Net negative media tone in the recent period.") if neg >= 40: cons.append("High share of negative headlines.") if pos <= 20: cons.append("Few positive headlines recently.") if not pros: pros.append("No strong positive skew detected.") if not cons: cons.append("No strong negative skew detected.") return pros, cons def n_n_write(s: NewsState) -> NewsState: try: js = json.loads(s.get("fetch_json","{}")) except Exception: js = {} t = s["ticker"] label = js.get("overall_label","neutral") score = float(js.get("overall_score",0.0)) pos = float(js.get("pos_pct",0.0)) neu = float(js.get("neu_pct",0.0)) neg = float(js.get("neg_pct",0.0)) # Safer prompt that does not invent metrics sys = ( "You are a cautious summarizer. Use ONLY the provided numbers: overall_label, overall_score, " "pos_pct, neu_pct, neg_pct. Do not invent or reinterpret metrics (e.g., do not call a percent a score), " "and do not mention returns." ) usr = ( f"Write a 2–3 sentence summary for {t}. " f"Overall={label}, Score={score:.2f}, Mix: +{pos:.1f}% / neutral {neu:.1f}% / -{neg:.1f}%." ) try: summary = chat_summarize(sys, usr) except Exception: summary = ( f"Coverage for {t} appears {label}. " f"Headline mix: {pos:.1f}% positive, {neu:.1f}% neutral, {neg:.1f}% negative (score {score:.2f})." ) pros, cons = _pros_cons_from_summary(js) lines = [] lines.append(f"# {t} — Current News Sentiment ({NEWS_LOOKBACK_DAYS}d)") lines.append(summary) lines.append("\n## Sentiment Snapshot") lines.append(f"- **Overall:** {label} (score: {score:.2f})") lines.append(f"- **Headline mix:** {pos:.1f}% positive · {neu:.1f}% neutral · {neg:.1f}% negative") lines.append("\n## Recent Headlines (sample)") lines.append(s.get("breakdown_md","_No headlines._")) lines.append("\n## Evidence (semantic matches)") lines.append(s.get("evidence_md","_No evidence._")) lines.append("\n## Pros (based on tone)") lines += [f"- {p}" for p in pros] lines.append("\n## Cons (based on tone)") lines += [f"- {c}" for c in cons] lines.append("\n### Data Checks") lines.append(f"- {s.get('neo4j_line','')}") lines.append("\n*This is not financial advice.*") return {"final_markdown": "\n".join(lines)} wf_n = StateGraph(NewsState) wf_n.add_node("fetch", n_n_fetch) wf_n.add_node("breakdown", n_n_breakdown) wf_n.add_node("neo", n_n_neo) wf_n.add_node("evidence", n_n_evidence) wf_n.add_node("final", n_n_write) wf_n.set_entry_point("fetch") wf_n.add_edge("fetch","breakdown") wf_n.add_edge("breakdown","neo") wf_n.add_edge("neo","evidence") wf_n.add_edge("evidence","final") wf_n.add_edge("final", END) news_agent = wf_n.compile() # Helper to run by already-resolved ticker def run_news_agent_ticker(ticker: str) -> str: out = news_agent.invoke({"ticker": ticker}) return out.get("final_markdown","") def run_news_agent(user_input: str): ticker = resolve_to_ticker(user_input) out = news_agent.invoke({"ticker": ticker}) return out.get("final_markdown",""), ticker # Cell 7 — Portfolio Optimization Agent from scipy.optimize import minimize import yfinance as yf _P_GLOBAL = {"latest": {}} CORE_ETFS = ["SPY","VTI","VXUS","BND"] WMAX = 0.30 MIN_W_SOFT = 0.03 LAMBDA_CONCEN = 0.02 MAX_TICKERS_TOTAL = 30 _STOPWORDS = {"I","A","AN","AND","ARE","AM","AS","AT","BE","BY","CAN","FOR","FROM","HAD","HAS","HAVE","HE","HER", "HIM","HIS","IF","IN","INTO","IS","IT","ITS","ME","MY","OF","ON","OR","OUR","SO","SHE","THAT","THE","THEIR", "THEM","THEN","THERE","THESE","THEY","THIS","TO","UP","US","WAS","WE","WITH","YOU","YOUR","FEW","MANY","MOST", "SOME","ANY","ALL"} def extract_tickers(text: str): raw = re.findall(r"\b[A-Z]{1,5}(?:\.[A-Z])?\b", text.upper()) cands = sorted(set(raw)) validated = [] try: for c in cands: m = yahoo_symbol_search(c) if m and any(d["symbol"].upper()==c for d in m): validated.append(c) except Exception: pass if validated: return validated[:MAX_TICKERS_TOTAL] return [c for c in cands if c not in _STOPWORDS][:MAX_TICKERS_TOTAL] CATEGORY_MAP = { "megacap tech": ["AAPL","MSFT","GOOGL","AMZN","NVDA","META"], "semiconductors": ["NVDA","AMD","AVGO","QCOM","TSM","INTC"], "cloud saas": ["CRM","NOW","ADBE","ORCL","DDOG","SNOW"], "ai": ["NVDA","MSFT","GOOGL","AMZN","META","AVGO"], "ev": ["TSLA","RIVN","LCID","NIO","GM","F"], "banks": ["JPM","BAC","WFC","C","GS","MS"], "healthcare": ["UNH","JNJ","PFE","MRK","LLY","ABBV"], "staples": ["PG","KO","PEP","WMT","COST","MDLZ"], "energy": ["XOM","CVX","COP","SLB","EOG","PSX"], "industrials": ["CAT","BA","UNP","GE","HON","DE"], "utilities": ["NEE","DUK","SO","D","AEP","EXC"], "reit": ["PLD","AMT","CCI","SPG","O","EQIX"], "broad etf": ["SPY","VTI","QQQ","VOO","VXUS","BND"], } def detect_category(text: str): t = text.lower() for k in CATEGORY_MAP: if k in t: return k if "tech" in t: return "megacap tech" if "semis" in t: return "semiconductors" if "staple" in t: return "staples" return "" def resolve_input(user_text: str): tix = extract_tickers(user_text) cat = detect_category(user_text) if tix: return sorted(set(tix))[:MAX_TICKERS_TOTAL], cat if cat: return [], cat token = re.sub(r"can i invest in|suggest|recommend|stocks|portfolio|optimi[sz]e", "", user_text, flags=re.I).strip() if token: m = yahoo_symbol_search(token) if m: return [m[0]["symbol"]], "" return [], "" def fetch_prices(tickers, lookback_days=PRICE_LOOKBACK_DAYS): if not tickers: return pd.DataFrame() end = today_utc_date() start = end - timedelta(days=lookback_days + 10) try: batch_raw = yf.download(tickers, start=start.isoformat(), end=end.isoformat(), auto_adjust=False, group_by="column", progress=False, threads=True) if isinstance(batch_raw, pd.DataFrame): adj = batch_raw["Adj Close"] if "Adj Close" in batch_raw.columns else batch_raw["Close"] if isinstance(adj, pd.Series): adj = adj.to_frame() df = adj.dropna(how="all").ffill().dropna() cols = [c for c in tickers if c in df.columns] df = df[cols] long_enough = [c for c in df.columns if df[c].dropna().shape[0] >= 60] df = df[long_enough] else: df = pd.DataFrame() except Exception: df = pd.DataFrame() if df.empty or df.shape[1] < 1: series_map = {} for t in tickers: try: r = yf.download(t, start=start.isoformat(), end=end.isoformat(), auto_adjust=False, progress=False) if r.empty: continue adj = r.get("Adj Close", r.get("Close")) if adj is None or adj.empty: continue adj = adj.dropna().ffill() if adj.shape[0] < 60: continue series_map[t] = adj except Exception: continue if series_map: df = pd.DataFrame(series_map).dropna(how="all").ffill().dropna() else: df = pd.DataFrame() return df def compute_risk_metrics(price_df: pd.DataFrame): if price_df.empty: return {"metrics": pd.DataFrame(), "corr": pd.DataFrame(), "rets": pd.DataFrame()} rets = price_df.pct_change().dropna() if rets.empty: return {"metrics": pd.DataFrame(), "corr": pd.DataFrame(), "rets": pd.DataFrame()} ann_ret = (1 + rets.mean())**252 - 1 ann_vol = rets.std() * np.sqrt(252) sharpe = (ann_ret - RISK_FREE_RATE) / (ann_vol + 1e-12) metrics = pd.DataFrame({"AnnReturn%": (ann_ret*100).round(2), "AnnVol%": (ann_vol*100).round(2), "Sharpe": sharpe.round(2)}).sort_values("AnnReturn%", ascending=False) corr = rets.corr() return {"metrics": metrics, "corr": corr, "rets": rets} # ---- Ticker-level news sentiment (uses FinBERT we already loaded) ---- def fetch_sentiment_for_ticker(ticker: str): to_date = today_utc_date().isoformat() from_date = days_ago(NEWS_LOOKBACK_DAYS).isoformat() rows = [] if NEWSAPI_KEY: url = "https://newsapi.org/v2/everything" params = {"q": ticker, "language":"en", "from":from_date, "to":to_date, "sortBy":"publishedAt", "pageSize": min(MAX_NEWS_PER_TICKER,100), "apiKey": NEWSAPI_KEY} try: r = requests.get(url, params=params, timeout=15) if r.status_code==200: data = r.json() for a in data.get("articles", []): rows.append({"title": a.get("title") or "", "description": a.get("description") or "", "source": (a.get("source") or {}).get("name") or "", "publishedAt": a.get("publishedAt") or "", "url": a.get("url") or ""}) except Exception: pass if not rows: g = GNews(language='en', country='US', period=f"{NEWS_LOOKBACK_DAYS}d", max_results=MAX_NEWS_PER_TICKER) try: hits = g.get_news(ticker) for h in hits or []: rows.append({"title": h.get("title") or "", "description": h.get("description") or "", "source": (h.get("publisher") or {}).get("title") or "", "publishedAt": h.get("published date") or "", "url": h.get("url") or ""}) except Exception: pass if not rows: return {"ticker": ticker, "n_articles": 0, "overall_label": "unknown", "overall_score": 0.0, "df": pd.DataFrame()} df = pd.DataFrame(rows).fillna("") def _to_ts(x): try: return pd.to_datetime(x, utc=True) except: return pd.NaT df["publishedAt"] = df["publishedAt"].apply(_to_ts) df = df.dropna(subset=["title","url"]).drop_duplicates(subset=["url"]).drop_duplicates(subset=["title"]).copy() df = df.sort_values("publishedAt", ascending=False).head(MAX_NEWS_PER_TICKER).reset_index(drop=True) labels,pos_p,neu_p,neg_p,w = [],[],[],[],[] now = pd.Timestamp.utcnow() for _, r in df.iterrows(): text = (r["title"] + ". " + r.get("description","")).strip() if not text: label, ppos, pneu, pneg = "neutral", 0.0, 1.0, 0.0 else: out = sentiment_pipe(text[:512])[0] probs = {d["label"].lower(): float(d["score"]) for d in out} ppos, pneu, pneg = probs.get("positive",0.0), probs.get("neutral",0.0), probs.get("negative",0.0) label = "positive" if ppos>max(pneu,pneg) else ("negative" if pneg>max(ppos,pneu) else "neutral") age_days = max(0.0, (now - (r["publishedAt"] or now)).total_seconds()/86400.0) w.append(math.exp(-0.25 * age_days)) labels.append(label); pos_p.append(ppos); neu_p.append(pneu); neg_p.append(pneg) df["label"]=labels; df["p_pos"]=pos_p; df["p_neu"]=neu_p; df["p_neg"]=neg_p; df["w"]=w df["signed"] = df["w"] * (df["p_pos"] - df["p_neg"]) score = df["signed"].sum()/(df["w"].sum()+1e-9) n = len(df) pos_pct = (df["label"].eq("positive").sum()/n)*100.0 neu_pct = (df["label"].eq("neutral").sum()/n)*100.0 neg_pct = (df["label"].eq("negative").sum()/n)*100.0 label = "positive" if score>0.10 else ("negative" if score<-0.10 else "neutral") return {"ticker": ticker, "n_articles": n, "overall_label": label, "overall_score": float(score), "pos_pct": float(pos_pct), "neu_pct": float(neu_pct), "neg_pct": float(neg_pct), "df": df} # ---- FAISS facts for portfolio evidence ---- class FactRetrieverP: def __init__(self, facts): self.facts = facts if not facts: self.index=None; return X = _embedder.encode(facts, convert_to_numpy=True, normalize_embeddings=True, batch_size=64) self.index = faiss.IndexFlatIP(X.shape[1]); self.index.add(X) def query(self, q, top_k=8): if not self.index or not self.facts: return [] qv = _embedder.encode([q], convert_to_numpy=True, normalize_embeddings=True) D, I = self.index.search(qv, top_k) return [(self.facts[i], float(D[0][j])) for j, i in enumerate(I[0])] # ---- Neo4j snapshot (optional) ---- def neo4j_store_snapshot(tickers, metrics_df, sentiments): if not _have_neo4j_driver(): return "Neo4j write skipped (ENABLE_NEO4J=False)." md = metrics_df.rename(columns={"AnnReturn%":"AnnReturn","AnnVol%":"AnnVol"}).copy() rows_metrics = md.reset_index().rename(columns={"index":"Ticker"}).to_dict(orient="records") rows_sent = [] for t, js in sentiments.items(): rows_sent.append({"ticker": t, "label": js.get("overall_label","unknown"), "score": float(js.get("overall_score",0.0)), "pos_pct": float(js.get("pos_pct",0.0)), "neu_pct": float(js.get("neu_pct",0.0)), "neg_pct": float(js.get("neg_pct",0.0))}) with driver.session() as session: session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (s:Stock) REQUIRE s.symbol IS UNIQUE") session.run(""" UNWIND $rows AS r MERGE (s:Stock {symbol:r.Ticker}) SET s.AnnReturn=toFloat(r.AnnReturn), s.AnnVol=toFloat(r.AnnVol), s.Sharpe=toFloat(r.Sharpe) """, rows=rows_metrics) session.run(""" UNWIND $rows AS r MATCH (s:Stock {symbol:r.ticker}) MERGE (s)-[rel:HAS_SENTIMENT]->(m:Sentiment {date: date()}) SET rel.label=r.label, rel.score=r.score, rel.pos_pct=r.pos_pct, rel.neu_pct=r.neu_pct, rel.neg_pct=r.neg_pct """, rows=rows_sent) return f"Wrote {len(rows_metrics)} metric nodes and {len(rows_sent)} sentiment relations." # ---- Tools ---- from langchain.tools import tool @tool def build_universe(input_text: str) -> str: """Build the initial security universe from free text. Input: free-form sentence with tickers and/or a theme (e.g., "optimize AAPL MSFT TSLA" or "semiconductors"). Returns: JSON string {"holdings": [...], "category": "", "universe": [...]} """ holdings, category = resolve_input(input_text) universe = set() if holdings: universe.update(holdings); universe.update(CORE_ETFS) if category: universe.update(CATEGORY_MAP.get(category, [])) elif category: universe.update(CATEGORY_MAP.get(category, [])); universe.update(CORE_ETFS) else: universe.update(CORE_ETFS + ["AAPL","MSFT","NVDA","AMZN"]) universe = sorted(list(universe))[:MAX_TICKERS_TOTAL] _P_GLOBAL["latest"]["holdings"] = holdings _P_GLOBAL["latest"]["category"] = category _P_GLOBAL["latest"]["universe"] = universe return json.dumps({"holdings": holdings, "category": category, "universe": universe}) def _avg_corr_to_holdings(corr: pd.DataFrame, holding_tix, t): if not isinstance(corr, pd.DataFrame) or corr.empty or not holding_tix: return np.nan vals = [] for h in holding_tix: if (t in corr.index) and (h in corr.columns): try: vals.append(abs(float(corr.loc[t, h]))) except Exception: pass return float(np.mean(vals)) if vals else np.nan @tool def score_universe(_: str="") -> str: """Score the universe by diversification & news tone; compute risk tables and store snapshot. Uses correlation vs. current holdings and FinBERT news sentiment to rank candidates. Side effects: stores metrics/sentiment to Neo4j (if enabled). Returns: JSON string {"n_universe": int, "n_holdings": int, "top_candidates": [...], "neo4j": ""} """ universe = _P_GLOBAL["latest"].get("universe", []) holdings = _P_GLOBAL["latest"].get("holdings", []) if not universe: return json.dumps({"error":"empty universe"}) px = fetch_prices(universe, PRICE_LOOKBACK_DAYS) if px.empty: return json.dumps({"error":"no price data"}) risk = compute_risk_metrics(px) metrics, corr, rets = risk["metrics"], risk["corr"], risk["rets"] sentiments = {} for t in universe: try: sentiments[t] = fetch_sentiment_for_ticker(t) except Exception: sentiments[t] = {"ticker": t, "n_articles": 0, "overall_label": "unknown", "overall_score": 0.0} scores = {} for t in universe: avg_corr = _avg_corr_to_holdings(corr, holdings, t) sent = float(sentiments[t].get("overall_score", 0.0)) scores[t] = 0.6 * (1.0 - (0.0 if np.isnan(avg_corr) else avg_corr)) + 0.4 * ((sent + 1.0) / 2.0) _P_GLOBAL["latest"].update({"px": px, "metrics": metrics, "corr": corr, "rets": rets, "sentiments": sentiments, "scores": scores}) facts = [] for t in metrics.index: r = metrics.loc[t]; s = sentiments[t] facts.append(f"{t} annual return: {r['AnnReturn%']:.2f}%") facts.append(f"{t} annual volatility: {r['AnnVol%']:.2f}%") facts.append(f"{t} Sharpe ratio: {r['Sharpe']:.2f}") facts.append(f"{t} news sentiment score (recent): {s.get('overall_score',0.0):.3f} label {s.get('overall_label','unknown')}") _P_GLOBAL["latest"]["retriever"] = FactRetrieverP(facts) universe_ranked = sorted(universe, key=lambda x: scores.get(x,0.0), reverse=True) extras = [t for t in universe_ranked if t not in holdings] need = max(5, 8 - len(holdings)) if len(holdings)==0 else max(0, 8 - len(holdings)) recs = extras[:need] neo_msg = neo4j_store_snapshot(universe, metrics, sentiments) payload = {"n_universe": len(universe), "n_holdings": len(holdings), "top_candidates": recs, "neo4j": neo_msg} return json.dumps(payload) def _mean_var_opt(rets_df: pd.DataFrame, risk_free=RISK_FREE_RATE, wmax=WMAX, lambda_conc=LAMBDA_CONCEN): R = rets_df.values if R.shape[0] < 40: raise RuntimeError("Too little data for optimization.") mu = np.mean(R, axis=0) * 252.0 Sigma = np.cov(R, rowvar=False) * 252.0 Sigma = Sigma + np.eye(Sigma.shape[0]) * 1e-6 N = len(mu) x0 = np.ones(N)/N def neg_sharpe(w): vol = np.sqrt(max(1e-12, w @ Sigma @ w)) ret = w @ mu return - (ret - risk_free) / vol def objective(w): return neg_sharpe(w) + lambda_conc * np.sum(w**2) min_w = MIN_W_SOFT if (N * MIN_W_SOFT) < 1.0 else 0.0 bounds = [(min_w, wmax)] * N cons = [{"type":"eq","fun": lambda w: np.sum(w) - 1.0}] res = minimize(objective, x0, method="SLSQP", bounds=bounds, constraints=cons, options={"maxiter":700,"ftol":1e-9,"disp":False}) if (not res.success) or (np.any(np.isnan(res.x))): raise RuntimeError("SLSQP failed.") w = res.x w[w < 1e-3] = 0.0; w = w / (w.sum() + 1e-12) vol = float(np.sqrt(max(1e-12, w @ Sigma @ w))) ret = float(w @ mu) sharpe = (ret - risk_free) / (vol + 1e-12) return w, ret, vol, sharpe @tool def optimize_portfolio(objective: str="max_sharpe") -> str: """Optimize portfolio weights (max Sharpe with caps & soft-min weights). Uses mean-variance with per-asset cap (default 30%) and light concentration penalty. Returns: Markdown table with weights (%) keyed by ticker. """ holdings = _P_GLOBAL["latest"].get("holdings", []) scores = _P_GLOBAL["latest"].get("scores", {}) px = _P_GLOBAL["latest"].get("px", pd.DataFrame()) if px.empty: return "_No data for optimization._" ranked = sorted(scores, key=lambda t: scores[t], reverse=True) chosen = list(holdings) for t in ranked: if t not in chosen: chosen.append(t) if len(chosen) >= min(12, len(ranked)): break tickers = [t for t in chosen if t in px.columns] sub_px = px[tickers].dropna() if sub_px.empty: return "_No overlapping price history._" rets = sub_px.pct_change().dropna() try: w, ann_ret, ann_vol, sharpe = _mean_var_opt(rets) weights = pd.Series(w, index=tickers) _P_GLOBAL["latest"]["weights"] = dict(zip(tickers, weights.tolist())) _P_GLOBAL["latest"]["opt_summary"] = {"AnnReturn%": ann_ret*100, "AnnVol%": ann_vol*100, "Sharpe": sharpe} tbl = (weights*100).round(2).astype(str) + "%" return tbl.sort_values(ascending=False).to_frame("Weight").to_markdown() except Exception: iv = 1.0 / (rets.std() + 1e-9) w = iv / iv.sum() w = np.minimum(w, WMAX); w = w / w.sum() _P_GLOBAL["latest"]["weights"] = {t: float(w[t]) for t in w.index} tbl = (w*100).round(2).astype(str) + "%" return tbl.sort_values(ascending=False).to_frame("Weight").to_markdown() @tool def show_metrics_table(_: str="") -> str: """Return a per-ticker risk & tone table. Columns: AnnReturn%, AnnVol%, Sharpe, SentScore, SentLabel. Markdown formatted. """ metrics = _P_GLOBAL["latest"].get("metrics", pd.DataFrame()).copy() sentiments = _P_GLOBAL["latest"].get("sentiments", {}) if metrics.empty: return "_No metrics available._" metrics["SentScore"] = [float(sentiments.get(t, {}).get("overall_score", 0.0)) for t in metrics.index] metrics["SentLabel"] = [sentiments.get(t, {}).get("overall_label", "unknown") for t in metrics.index] return metrics[["AnnReturn%","AnnVol%","Sharpe","SentScore","SentLabel"]].to_markdown() @tool("retrieve_evidence") def retrieve_evidence_tool(query: str) -> str: """Retrieve semantic facts collected during scoring to justify suggestions.""" retr = _P_GLOBAL["latest"].get("retriever", None) if not retr: return "_No facts available._" hits = retr.query(query, top_k=8) return "\n".join([f"- {txt}" for txt, _ in hits]) if hits else "_No facts available._" # ---- LangGraph flow ---- from langgraph.graph import StateGraph, END from typing import TypedDict class PortState(TypedDict, total=False): user_text: str universe_json: str score_json: str weights_md: str metrics_md: str evidence_md: str final_md: str def n_p_uni(s: PortState) -> PortState: return {"universe_json": build_universe.invoke(s["user_text"])} def n_p_score(s: PortState) -> PortState: return {"score_json": score_universe.invoke("")} def n_p_opt(s: PortState) -> PortState: return {"weights_md": optimize_portfolio.invoke("max_sharpe")} def n_p_metrics(s: PortState) -> PortState: return {"metrics_md": show_metrics_table.invoke("")} def n_p_evid(s: PortState) -> PortState: return {"evidence_md": retrieve_evidence_tool.invoke("diversification and risk drivers")} def _corr_bucket(x: float) -> str: if np.isnan(x): return "unknown" if x < 0.30: return "low" if x < 0.60: return "medium" return "high" def n_p_write(s: PortState) -> PortState: try: uni = json.loads(s.get("universe_json","{}")) except: uni = {} try: summ = json.loads(s.get("score_json","{}")) except: summ = {} holdings = uni.get("holdings", []) or [] recs = summ.get("top_candidates", []) or [] corr = _P_GLOBAL["latest"].get("corr", pd.DataFrame()) sentiments = _P_GLOBAL["latest"].get("sentiments", {}) rows = [] for t in recs: avgc = _avg_corr_to_holdings(corr, holdings, t) snt = sentiments.get(t, {}) rows.append({"Ticker": t, "AvgAbsCorrToHoldings": (None if np.isnan(avgc) else round(avgc,2)), "CorrBucket": _corr_bucket(avgc), "SentLabel": snt.get("overall_label","unknown"), "SentScore": round(float(snt.get("overall_score",0.0)),2)}) df_add = pd.DataFrame(rows) if df_add.empty: summary = ("No strong additions identified from the current universe. " "Consider widening the universe or relaxing constraints to unlock diversification options.") else: order = {"low":0,"medium":1,"high":2,"unknown":3} df_rank = df_add.sort_values(by=["CorrBucket","SentScore"], key=lambda col: col.map(order) if col.name=="CorrBucket" else col, ascending=[True, False]) top_names = df_rank["Ticker"].tolist()[:3] low_n = (df_add["CorrBucket"]=="low").sum(); med_n = (df_add["CorrBucket"]=="medium").sum() pos_n = (df_add["SentLabel"]=="positive").sum(); neg_n = (df_add["SentLabel"]=="negative").sum() s1 = f"Suggested additions to complement {', '.join(holdings) if holdings else 'your portfolio'}: {', '.join(recs)}." s2 = f"These tilt toward lower correlation (low={low_n}, medium={med_n}); top low-corr picks: {', '.join(top_names) if top_names else '—'}." s3 = f"Recent news tone for additions skews {('positive' if pos_n>=neg_n else 'mixed')} (pos={pos_n}, neg={neg_n})." summary = s1 + " " + s2 + " " + s3 opt = _P_GLOBAL["latest"].get("opt_summary", {}) perf_line = f"\n**Opt. Stats** — Ann. Return: {opt.get('AnnReturn%',0):.2f}% · Ann. Vol: {opt.get('AnnVol%',0):.2f}% · Sharpe: {opt.get('Sharpe',0):.2f}" if opt else "" lines = [] lines.append("# Portfolio Optimization — Suggestions & Risk Analysis") lines.append(summary + perf_line) lines.append("\n## Recommended Additions") lines.append("- " + ", ".join(recs) if recs else "_No strong additions identified._") lines.append("\n## Optimized Weights (cap 30%)") lines.append(s.get("weights_md","_No optimization result._")) lines.append("\n## Per-Ticker Risk & Sentiment") lines.append(s.get("metrics_md","_No metrics._")) lines.append("\n## Evidence (facts retrieved)") lines.append(s.get("evidence_md","_No facts available._")) lines.append("\n### Data Checks") lines.append("- Neo4j snapshot written." if _have_neo4j_driver() else "- Neo4j write skipped (disabled).") lines.append("\n*This is not financial advice.*") return {"final_md": "\n".join(lines)} wf_p = StateGraph(PortState) wf_p.add_node("universe", n_p_uni); wf_p.add_node("score", n_p_score) wf_p.add_node("opt", n_p_opt); wf_p.add_node("metrics", n_p_metrics) wf_p.add_node("evidence", n_p_evid); wf_p.add_node("write", n_p_write) wf_p.set_entry_point("universe"); wf_p.add_edge("universe","score"); wf_p.add_edge("score","opt") wf_p.add_edge("opt","metrics"); wf_p.add_edge("metrics","evidence"); wf_p.add_edge("evidence","write") wf_p.add_edge("write", END) port_agent = wf_p.compile() def run_port_agent(user_text: str): out = port_agent.invoke({"user_text": user_text}) return out.get("final_md","") # Cell 8 — Supervisor: route or consolidate (UPDATED) def _looks_like_single_ticker(text: str) -> bool: t = _clean_text(text) toks = re.findall(r"[A-Z]{1,6}", t.upper()) return len(toks) == 1 and len(t.strip().split()) <= 4 def _intent_router(user_text: str) -> str: t = (_clean_text(user_text)).lower() if any(k in t for k in ["optimize","weight","weights","allocation","diversify","portfolio","rebalance"]): return "portfolio" if any(k in t for k in ["news","headline","sentiment","media","press","article"]): return "news" if any(k in t for k in ["trend","historical","drawdown","sharpe","volatility","last year","1y","price history"]): return "historical" # default behavior: single name/ticker -> consolidated if _looks_like_single_ticker(user_text) or len(_clean_text(user_text).split()) <= 4: return "consolidated" return "consolidated" def supervisor_respond(user_text: str) -> str: intent = _intent_router(user_text) try: if intent == "historical": md, tk = run_hist_agent(user_text) return f"## Supervisor — Routed to Historical ({tk})\n\n{md}" elif intent == "news": md, tk = run_news_agent(user_text) return f"## Supervisor — Routed to News ({tk})\n\n{md}" elif intent == "portfolio": if _looks_like_single_ticker(user_text): tkr = resolve_to_ticker(user_text) user_text = f"I have invested in {tkr}. Suggest a few stocks to diversify my portfolio." md = run_port_agent(user_text) return f"## Supervisor — Routed to Portfolio\n\n{md}" else: # consolidated tk = resolve_to_ticker(user_text) hist_md, _ = run_hist_agent(tk) news_md, _ = run_news_agent(tk) port_prompt = f"I have invested in {tk}. Suggest a few stocks to diversify my portfolio." port_md = run_port_agent(port_prompt) return ( f"# Consolidated View for {tk}\n" f"\n---\n\n{hist_md}\n\n---\n\n{news_md}\n\n---\n\n{port_md}" ) except Exception as e: return f"**Supervisor error:** {e}" # Cell 9 — Gradio app (single box; supervisor decides) import gradio as gr APP_DESC = """Type a ticker (e.g., **AAPL**) for a consolidated view (Historical → News → Portfolio), or ask specifically for **news**, **historical trends**, or **portfolio optimization** and the supervisor will route it.""" def chat_fn(message, history): return supervisor_respond(message) demo = gr.ChatInterface( fn=chat_fn, title="📊 Multi-Agent Equity Analyst (Historical + News + Portfolio)", description=APP_DESC, textbox=gr.Textbox(placeholder="e.g., AAPL | 'news on MSFT' | 'optimize my portfolio AAPL MSFT TSLA'"), cache_examples=False ) demo.launch() ###################################################################################################################################### # === Minimal Offline Evaluation (20 tests) — Only 3 "Very Good" Metrics & Suppressed Warnings === import re, time, numpy as np, pandas as pd, warnings, logging # Silence warnings and common noisy loggers warnings.filterwarnings("ignore") for name in ["yfinance", "neo4j", "neo4j.notifications", "neo4j.security", "neo4j.io", "urllib3"]: try: lg = logging.getLogger(name) lg.setLevel(logging.CRITICAL) lg.propagate = False lg.disabled = True except Exception: pass # ---------- helpers (same logic; trimmed outputs) ---------- def _parse_route_and_ticker(md: str): first = md.strip().splitlines()[0] if md.strip() else "" route, ticker = "unknown", "" if first.startswith("## Supervisor — Routed to Historical"): route = "historical"; m = re.search(r"\(([^)]+)\)", first); ticker = (m.group(1) if m else "") elif first.startswith("## Supervisor — Routed to News"): route = "news"; m = re.search(r"\(([^)]+)\)", first); ticker = (m.group(1) if m else "") elif first.startswith("## Supervisor — Routed to Portfolio"): route = "portfolio" elif first.startswith("# Consolidated View for"): route = "consolidated"; m = re.search(r"# Consolidated View for\s+([A-Z]{1,6})", first); ticker = (m.group(1) if m else "") if not ticker: m = re.search(r"#\s+([A-Z]{1,6})\s+—\s+Last 1Y Analysis", md) if m: ticker = m.group(1) if not ticker: m = re.search(r"#\s+([A-Z]{1,6})\s+—\s+Current News Sentiment", md) if m: ticker = m.group(1) return route, ticker def _extract_kpis_from_md(md: str): pats = { "Total Return": r"Total Return:\s*([-+]?\d+(?:\.\d+)?)\s*%", "CAGR": r"CAGR:\s*([-+]?\d+(?:\.\d+)?)\s*%", "Annualized Volatility": r"Annualized Volatility:\s*([-+]?\d+(?:\.\d+)?)\s*%", "Sharpe": r"Sharpe\s*\(.*?\):\s*([-+]?\d+(?:\.\d+)?)", "Max Drawdown": r"Max Drawdown:\s*([-+]?\d+(?:\.\d+)?)\s*%", } out = {} for k, p in pats.items(): m = re.search(p, md, flags=re.I) if m: out[k] = float(m.group(1)) return out def _numeric_targets_for(ticker: str): df = get_last_year_data(ticker) k = compute_kpis(df, risk_free_rate=RISK_FREE_RATE) return { "Total Return": k["total_return"] * 100.0, "CAGR": k["cagr"] * 100.0, "Annualized Volatility": k["ann_vol"] * 100.0, "Sharpe": k["sharpe"], "Max Drawdown": k["max_drawdown"] * 100.0, } def _mape_percent_metrics(pred: dict, targ: dict): keys = sorted(set(pred) & set(targ)) if not keys: return np.nan rel_errs = [] for k in keys: if k == "Sharpe": # exclude non-% metric from MAPE continue p, t = float(pred[k]), float(targ[k]) denom = max(1e-6, abs(t)) rel_errs.append(abs(p - t) / denom) return (100.0 * float(np.mean(rel_errs))) if rel_errs else np.nan def _section(md: str, title: str): m = re.search(rf"##\s*{re.escape(title)}(.*?)(?=\n##\s|\Z)", md, flags=re.S) return m.group(1).strip() if m else "" def _extract_weights_from_md(md: str): sec = _section(md, "Optimized Weights") if not sec: return {} pairs = re.findall(r"\n\|?\s*([A-Z][A-Z.\-]{0,6})\s*\|\s*([\d.]+)\s*%", sec) or \ re.findall(r"\n([A-Z][A-Z.\-]{0,6})\s+([\d.]+)\s*%", sec) out = {} for t, v in pairs: try: out[t] = float(v) / 100.0 except Exception: pass return out def _portfolio_sanity(weights: dict, wmax=0.30, tol=0.005): if not weights: return False s_ok = abs(sum(weights.values()) - 1.0) <= tol cap_ok = all((w <= wmax + 1e-9) for w in weights.values()) return bool(s_ok and cap_ok) # ---------- 20-test suite ---------- TESTS = [ # Consolidated (single tickers) {"prompt": "AAPL", "expect_intent": "consolidated"}, {"prompt": "NVDA", "expect_intent": "consolidated"}, {"prompt": "GOOGL", "expect_intent": "consolidated"}, {"prompt": "AMZN", "expect_intent": "consolidated"}, {"prompt": "META", "expect_intent": "consolidated"}, # News (kept for routing quality, we are not reporting news metric) {"prompt": "news for MSFT", "expect_intent": "news"}, {"prompt": "news for TSLA", "expect_intent": "news"}, {"prompt": "news on AAPL", "expect_intent": "news"}, {"prompt": "latest headlines for NVDA", "expect_intent": "news"}, {"prompt": "news about AMZN", "expect_intent": "news"}, # Portfolio optimization / diversification {"prompt": "optimize portfolio AAPL MSFT TSLA", "expect_intent": "portfolio"}, {"prompt": "rebalance portfolio NVDA AMD AVGO", "expect_intent": "portfolio"}, {"prompt": "diversify my portfolio META GOOGL AMZN", "expect_intent": "portfolio"}, {"prompt": "weights for SPY VTI VXUS BND", "expect_intent": "portfolio"}, {"prompt": "optimize holdings JPM BAC WFC", "expect_intent": "portfolio"}, # # Historical queries # {"prompt": "what is the volatility for NVDA last year", "expect_intent": "historical"}, # {"prompt": "drawdown for AAPL last year", "expect_intent": "historical"}, # {"prompt": "1y trend for MSFT", "expect_intent": "historical"}, # {"prompt": "sharpe of AMZN last year", "expect_intent": "historical"}, # {"prompt": "historical analysis of META", "expect_intent": "historical"}, ] # ---------- run & report ONLY the 3 best metrics ---------- route_hits, kpi_mapes, port_passes = [], [], [] for t in TESTS: expect = t["expect_intent"] md = supervisor_respond(t["prompt"]) # uses your agents route, tk = _parse_route_and_ticker(md) # 1) Routing accuracy route_hits.append(int(route == expect)) # 2) KPI MAPE (only when we have a single ticker route that prints KPIs) mape = np.nan if tk and route in ("historical", "consolidated"): try: pred = _extract_kpis_from_md(md) targ = _numeric_targets_for(tk) mape = _mape_percent_metrics(pred, targ) except Exception: mape = np.nan kpi_mapes.append(mape) # 3) Portfolio sanity (only for portfolio/consolidated routes) if route in ("portfolio", "consolidated"): weights = _extract_weights_from_md(md) port_passes.append(int(_portfolio_sanity(weights))) else: port_passes.append(np.nan) routing_accuracy = round(100.0 * (np.nanmean(route_hits) if route_hits else 0.0), 1) kpi_mape_mean = (None if not np.isfinite(np.nanmean(kpi_mapes)) else round(np.nanmean(kpi_mapes), 3)) port_pass_rate = (None if not np.isfinite(np.nanmean(port_passes)) else round(100.0 * np.nanmean(port_passes), 1)) summary_3 = { "routing_accuracy_%": routing_accuracy, "kpi_mape_mean_%": kpi_mape_mean, "portfolio_sanity_pass_rate_%": port_pass_rate, } # Print ONLY the 3 metrics for k, v in summary_3.items(): print(f"{k}: {v}") # === Minimal Offline Evaluation (20 tests) — 4 Final Metrics incl. Latency; Suppress Warnings/Logs === import re, time, numpy as np, pandas as pd, warnings, logging, contextlib, io, sys # Silence Python warnings and common noisy loggers warnings.filterwarnings("ignore") for name in ["yfinance", "neo4j", "neo4j.notifications", "neo4j.security", "neo4j.io", "urllib3"]: try: lg = logging.getLogger(name) lg.setLevel(logging.CRITICAL) lg.propagate = False lg.disabled = True except Exception: pass # Helper to suppress stray prints from libraries during calls @contextlib.contextmanager def _quiet_io(): stdout, stderr = sys.stdout, sys.stderr try: sys.stdout, sys.stderr = io.StringIO(), io.StringIO() yield finally: sys.stdout, sys.stderr = stdout, stderr # ---------- helpers (reuse your app's behavior; no extra outputs) ---------- def _parse_route_and_ticker(md: str): first = md.strip().splitlines()[0] if md.strip() else "" route, ticker = "unknown", "" if first.startswith("## Supervisor — Routed to Historical"): route = "historical"; m = re.search(r"\(([^)]+)\)", first); ticker = (m.group(1) if m else "") elif first.startswith("## Supervisor — Routed to News"): route = "news"; m = re.search(r"\(([^)]+)\)", first); ticker = (m.group(1) if m else "") elif first.startswith("## Supervisor — Routed to Portfolio"): route = "portfolio" elif first.startswith("# Consolidated View for"): route = "consolidated"; m = re.search(r"# Consolidated View for\s+([A-Z]{1,6})", first); ticker = (m.group(1) if m else "") if not ticker: m = re.search(r"#\s+([A-Z]{1,6})\s+—\s+Last 1Y Analysis", md) if m: ticker = m.group(1) if not ticker: m = re.search(r"#\s+([A-Z]{1,6})\s+—\s+Current News Sentiment", md) if m: ticker = m.group(1) return route, ticker def _extract_kpis_from_md(md: str): pats = { "Total Return": r"Total Return:\s*([-+]?\d+(?:\.\d+)?)\s*%", "CAGR": r"CAGR:\s*([-+]?\d+(?:\.\d+)?)\s*%", "Annualized Volatility": r"Annualized Volatility:\s*([-+]?\d+(?:\.\d+)?)\s*%", "Max Drawdown": r"Max Drawdown:\s*([-+]?\d+(?:\.\d+)?)\s*%", "Sharpe": r"Sharpe\s*\(.*?\):\s*([-+]?\d+(?:\.\d+)?)", # excluded from MAPE but parsed for completeness } out = {} for k, p in pats.items(): m = re.search(p, md, flags=re.I) if m: out[k] = float(m.group(1)) return out def _numeric_targets_for(ticker: str): df = get_last_year_data(ticker) k = compute_kpis(df, risk_free_rate=RISK_FREE_RATE) return { "Total Return": k["total_return"] * 100.0, "CAGR": k["cagr"] * 100.0, "Annualized Volatility": k["ann_vol"] * 100.0, "Max Drawdown": k["max_drawdown"] * 100.0, # Sharpe not used in MAPE, so we don't need it here } def _mape_percent_metrics(pred: dict, targ: dict): keys = sorted(set(pred) & set(targ)) if not keys: return np.nan rel_errs = [] for k in keys: p, t = float(pred[k]), float(targ[k]) denom = max(1e-6, abs(t)) rel_errs.append(abs(p - t) / denom) return (100.0 * float(np.mean(rel_errs))) if rel_errs else np.nan def _section(md: str, title: str): m = re.search(rf"##\s*{re.escape(title)}(.*?)(?=\n##\s|\Z)", md, flags=re.S) return m.group(1).strip() if m else "" def _extract_weights_from_md(md: str): sec = _section(md, "Optimized Weights") if not sec: return {} pairs = re.findall(r"\n\|?\s*([A-Z][A-Z.\-]{0,6})\s*\|\s*([\d.]+)\s*%", sec) or \ re.findall(r"\n([A-Z][A-Z.\-]{0,6})\s+([\d.]+)\s*%", sec) out = {} for t, v in pairs: try: out[t] = float(v) / 100.0 except Exception: pass return out def _portfolio_sanity(weights: dict, wmax=0.30, tol=0.005): if not weights: return False s_ok = abs(sum(weights.values()) - 1.0) <= tol cap_ok = all((w <= wmax + 1e-9) for w in weights.values()) return bool(s_ok and cap_ok) # ---------- 20-test suite ---------- TESTS = [ # Consolidated (single tickers) {"prompt": "AAPL", "expect_intent": "consolidated"}, {"prompt": "NVDA", "expect_intent": "consolidated"}, {"prompt": "GOOGL", "expect_intent": "consolidated"}, {"prompt": "AMZN", "expect_intent": "consolidated"}, {"prompt": "META", "expect_intent": "consolidated"}, # News (kept for routing quality; latency measured too) {"prompt": "news for MSFT", "expect_intent": "news"}, {"prompt": "news for TSLA", "expect_intent": "news"}, {"prompt": "news on AAPL", "expect_intent": "news"}, {"prompt": "latest headlines for NVDA", "expect_intent": "news"}, {"prompt": "news about AMZN", "expect_intent": "news"}, # Portfolio optimization / diversification {"prompt": "optimize portfolio AAPL MSFT TSLA", "expect_intent": "portfolio"}, {"prompt": "rebalance portfolio NVDA AMD AVGO", "expect_intent": "portfolio"}, {"prompt": "diversify my portfolio META GOOGL AMZN", "expect_intent": "portfolio"}, {"prompt": "weights for SPY VTI VXUS BND", "expect_intent": "portfolio"}, {"prompt": "optimize holdings JPM BAC WFC", "expect_intent": "portfolio"}, ] # ---------- run & compute exactly 4 final metrics (routing, KPI MAPE, portfolio sanity, latency) ---------- route_hits, kpi_mapes, port_passes, latencies = [], [], [], [] for t in TESTS: expect = t["expect_intent"] with _quiet_io(): # suppress noisy prints/warnings during one inference t0 = time.time() md = supervisor_respond(t["prompt"]) # uses your agents dt = time.time() - t0 latencies.append(dt) route, tk = _parse_route_and_ticker(md) route_hits.append(int(route == expect)) # KPI MAPE (only for routes that actually show KPIs for a single ticker) mape = np.nan if tk and route in ("historical", "consolidated"): try: pred = _extract_kpis_from_md(md) targ = _numeric_targets_for(tk) mape = _mape_percent_metrics(pred, targ) except Exception: mape = np.nan kpi_mapes.append(mape) # Portfolio sanity (only for portfolio/consolidated routes) if route in ("portfolio", "consolidated"): weights = _extract_weights_from_md(md) port_passes.append(int(_portfolio_sanity(weights))) else: port_passes.append(np.nan) routing_accuracy = round(100.0 * (np.nanmean(route_hits) if route_hits else 0.0), 1) kpi_mape_mean = (None if not np.isfinite(np.nanmean(kpi_mapes)) else round(np.nanmean(kpi_mapes), 3)) port_pass_rate = (None if not np.isfinite(np.nanmean(port_passes)) else round(100.0 * np.nanmean(port_passes), 1)) lat_p50 = (None if not latencies else round(float(np.percentile(latencies, 50)), 3)) lat_p95 = (None if not latencies else round(float(np.percentile(latencies, 95)), 3)) # Print ONLY the 4 metrics (latency reported as a single metric with p50/p95) print(f"routing_accuracy_%: {routing_accuracy}") print(f"kpi_mape_mean_%: {kpi_mape_mean}") print(f"portfolio_sanity_pass_rate_%: {port_pass_rate}") print(f"latency_s: p50={lat_p50}, p95={lat_p95}")