import time, logging, json, traceback from typing import Optional, Dict, Any from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from model_pipeline import Predictor, FEATURE_MAP, LABELS import io import numpy as np import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from fastapi.responses import Response logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s" ) log = logging.getLogger("api") # ----------- input model ----------- class PredictIn(BaseModel): include_neg: bool = False Debitore_cluster: Optional[str] = None Stato_Giudizio: Optional[str] = None Cedente: Optional[str] = None # alias con spazi/punti Importo_iniziale_outstanding: Optional[float] = Field(None, alias="Importo iniziale outstanding") Decreto_sospeso: Optional[str] = Field(None, alias="Decreto sospeso") Notifica_Decreto: Optional[str] = Field(None, alias="Notifica Decreto") Opposizione_al_decreto_ingiuntivo: Optional[str] = Field(None, alias="Opposizione al decreto ingiuntivo") Ricorso_al_TAR: Optional[str] = Field(None, alias="Ricorso al TAR") Sentenza_TAR: Optional[str] = Field(None, alias="Sentenza TAR") Atto_di_Precetto: Optional[str] = Field(None, alias="Atto di Precetto") Decreto_Ingiuntivo: Optional[str] = Field(None, alias="Decreto Ingiuntivo") Sentenza_giudizio_opposizione: Optional[str] = Field(None, alias="Sentenza giudizio opposizione") giorni_da_iscrizione: Optional[int] = None giorni_da_cessione: Optional[int] = None Zona: Optional[str] = None model_config = {"populate_by_name": True, "extra": "allow"} # ----------- app ----------- app = FastAPI(title="Predizione+SHAP API", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"] ) t0 = time.time() predictor: Predictor | None = None @app.on_event("startup") def _load_model(): global predictor predictor = Predictor() log.info(f"Model loaded in {predictor.load_seconds:.2f}s") @app.get("/health") def health(): return {"ok": predictor is not None, "uptime_s": time.time()-t0} # Ordine delle classi (stesso usato dal modello) _CLASS_ORDER = LABELS + ["100%"] _CLASS_TO_IDX = {c: i for i, c in enumerate(_CLASS_ORDER)} def _payload_from_inp(inp) -> dict: """Ricostruisce un dict 'payload' a partire dall'input pydantic.""" payload = {} for k in FEATURE_MAP.values(): ak = k.replace(" ", "_").replace(".", "_") payload[k] = getattr(inp, ak, None) return payload def _moving_average(y: np.ndarray, window: int = 9): """Applica una media mobile semplice per smoothing.""" w = int(window) if w < 1: return y if w % 2 == 0: w += 1 if w > len(y): w = max(1, len(y)//2*2+1) kernel = np.ones(w) / w return np.convolve(y, kernel, mode="same") def _class_curve_png(predictor, base_payload: dict, var_name: str, vmin: int = 0, vmax: int = 3000, n_base: int = 80, # punti reali (inferenze) n_dense: int = 400, # punti interpolati ma_window: int = 9, title: str = "") -> bytes: xs_base = np.linspace(vmin, vmax, n_base).round().astype(int) xs_base = np.clip(xs_base, vmin, vmax) xs_base = np.unique(xs_base) # classe → indice y_base = [] for v in xs_base: p = dict(base_payload) p[var_name] = int(v) out = predictor.predict_class_fast(p) y_base.append(_CLASS_TO_IDX[out["class"]]) y_base = np.array(y_base, dtype=float) # interpolazione xs_dense = np.linspace(vmin, vmax, n_dense) y_dense = np.interp(xs_dense, xs_base, y_base) # smoothing y_smooth = _moving_average(y_dense, ma_window) y_smooth = np.clip(y_smooth, 0, len(_CLASS_ORDER)-1) # plot fig, ax = plt.subplots(figsize=(9, 4)) ax.plot(xs_dense, y_smooth, linewidth=2) ax.set_xlim(vmin, vmax) ax.set_ylim(-0.2, len(_CLASS_ORDER)-1 + 0.2) ax.set_yticks(range(len(_CLASS_ORDER))) ax.set_yticklabels(_CLASS_ORDER) ax.set_xlabel(var_name) ax.set_ylabel("Classe (smooth)") ax.set_title(title or f"Classe (smooth) vs {var_name}") ax.grid(True, linestyle="--", alpha=0.35) fig.tight_layout() buf = io.BytesIO() fig.savefig(buf, format="png", dpi=150, bbox_inches="tight") plt.close(fig) return buf.getvalue() @app.post("/predict") def predict(inp: PredictIn): if predictor is None: raise HTTPException(503, "Model not ready") # ricomponi payload secondo i nomi originali delle feature payload: Dict[str, Any] = {} for k in FEATURE_MAP.values(): ak = k.replace(" ", "_").replace(".", "_") payload[k] = getattr(inp, ak, None) payload["include_neg"] = inp.include_neg try: out = predictor.predict_dict(payload, include_neg=inp.include_neg) # assicura chiave 'class' (nessuna alias confusion) if "class_" in out and "class" not in out: out["class"] = out.pop("class_") log.info(json.dumps({ "event":"predict_ok", "class": out.get("class"), "stage": out.get("stage_used"), "p100": round(out.get("p100", 0.0), 4) })) return out except Exception as e: log.exception("predict_error") raise HTTPException(500, f"Prediction error: {e}") from e @app.post("/plot/curve-class-cessione.png") def plot_curve_class_cessione(inp: PredictIn, vmin: int = 0, vmax: int = 3000, n_base: int = 80, n_dense: int = 400, ma_window: int = 9): if predictor is None: raise HTTPException(503, "Model not ready") base_payload = _payload_from_inp(inp) img = _class_curve_png( predictor, base_payload, var_name="giorni_da_cessione", vmin=vmin, vmax=vmax, n_base=n_base, n_dense=n_dense, ma_window=ma_window, title="Classe predetta vs Giorni da Cessione" ) return Response(content=img, media_type="image/png") @app.post("/plot/curve-class-iscrizione.png") def plot_curve_class_iscrizione(inp: PredictIn, vmin: int = 0, vmax: int = 3000, n_base: int = 80, n_dense: int = 400, ma_window: int = 9): if predictor is None: raise HTTPException(503, "Model not ready") base_payload = _payload_from_inp(inp) img = _class_curve_png( predictor, base_payload, var_name="giorni_da_iscrizione", vmin=vmin, vmax=vmax, n_base=n_base, n_dense=n_dense, ma_window=ma_window, title="Classe predetta (smooth) vs Giorni da Iscrizione" ) return Response(content=img, media_type="image/png")