import numpy as np, pandas as pd, warnings, time, uuid warnings.filterwarnings("ignore") from sklearn.linear_model import LogisticRegression from sklearn.base import BaseEstimator, ClassifierMixin import xgboost as xgb import shap # -------------------- CONFIG -------------------- DATA_PATH = "/app/data/final_report.csv" # <— assicurati che il file esista nel container! FEATURE_MAP = { "Debitore_cluster": "Debitore_cluster", "Stato_Giudizio": "Stato_Giudizio", "Cedente": "Cedente", "Importo.iniziale.outstanding": "Importo iniziale outstanding", "Decreto.sospeso": "Decreto sospeso", "Notifica.Decreto": "Notifica Decreto", "Opposizione.al.decreto.ingiuntivo": "Opposizione al decreto ingiuntivo", "Ricorso.al.TAR": "Ricorso al TAR", "Sentenza.TAR": "Sentenza TAR", "Atto.di.Precetto": "Atto di Precetto", "Decreto.Ingiuntivo": "Decreto Ingiuntivo", "Sentenza.giudizio.opposizione": "Sentenza giudizio opposizione", "giorni_da_iscrizione": "giorni_da_iscrizione", "giorni_da_cessione": "giorni_da_cessione", "Zona": "Zona" } LABELS = ["quasi_nulla","bassa","media","alta"] BINS = [0, 11, 30, 70, 100] MIDPOINTS = np.array([5.5, 20.5, 50.0, 85.0]) MONTH_BINS_DAYS = np.array([0, 30, 60, 90, 180, 360, 720, 1e9], dtype=float) MONTH_LABELS = ["<1m","1–2m","2–3m","3–6m","6–12m","12–24m",">=24m"] IMPORTO_BINS = [0.0,1_000.0,10_000.0,50_000.0,100_000.0,500_000.0,1_000_000.0,2_000_000.0] IMPORTO_LABELS = ["<1k","1–10k","10–50k","50–100k","100–500k","500k–1M",">=1M"] RANDOM_STATE = 42 P100_THR_AUTO = 0.71 STAGE1_LOGIT_PARAMS = dict(max_iter=500, solver='liblinear') STAGE2_ORD_XGB_PARAMS = dict( objective="binary:logistic", n_estimators=700, learning_rate=0.05, max_depth=4, subsample=0.9, colsample_bytree=0.85, min_child_weight=2.0, gamma=0.1, reg_lambda=5.0, reg_alpha=0.5, n_jobs=-1, random_state=RANDOM_STATE, verbosity=0 ) TOP_K_TEXT = 3 MIN_ABS_SHOW = 0.01 TOP_K_ONELINER = 2 # -------------------- MODELLO ORDINATO -------------------- class OrdinalXGB(BaseEstimator, ClassifierMixin): def __init__(self, n_classes=4, **xgb_params): self.n_classes = n_classes self.xgb_params = xgb_params self.models = [] def fit(self, X, y, sample_weight=None): self.models = [] for k in range(1, self.n_classes): y_bin = (y >= k).astype(int) clf = xgb.XGBClassifier(**self.xgb_params) clf.fit(X, y_bin, sample_weight=sample_weight) self.models.append(clf) return self def _cum_probs(self, X): cps = np.vstack([clf.predict_proba(X)[:,1] for clf in self.models]).T cps = np.clip(cps, 1e-6, 1-1e-6) for k in range(1, cps.shape[1]): cps[:,k] = np.minimum(cps[:,k-1], cps[:,k]) return cps def predict_proba(self, X): cps = self._cum_probs(X); n = X.shape[0] proba = np.zeros((n, self.n_classes)) proba[:,0] = 1 - cps[:,0] for k in range(1, self.n_classes-1): proba[:,k] = cps[:,k-1] - cps[:,k] proba[:,-1] = cps[:,-1] s = proba.sum(axis=1, keepdims=True); s[s==0]=1.0 return np.clip(proba/s, 0, 1) def mode_(s: pd.Series): s = s.dropna() return s.mode().iloc[0] if len(s) else np.nan class Predictor: def __init__(self, data_path=DATA_PATH): t0 = time.time() self.data_path = data_path df = pd.read_csv(self.data_path) inc = df['incassi_perc'].replace([np.inf,-np.inf], np.nan).fillna(100.0).clip(0,100) df_model = df[[v for v in FEATURE_MAP.values() if v in df.columns]].copy() df_model['incassi_perc_capped'] = inc df_model['y100'] = (inc >= 100.0-1e-9).astype(int) df_model['livello'] = pd.cut(np.minimum(inc, 99.999), bins=BINS, labels=LABELS, right=False, include_lowest=True) self.num_cols, self.cat_cols = [], [] for c in FEATURE_MAP.values(): if c in df_model.columns: (self.num_cols if pd.api.types.is_numeric_dtype(df_model[c]) else self.cat_cols).append(c) self.params, full_oh = self.preprocess_fit(df_model) self.feat_cols_full = [c for c in full_oh.columns if c not in ['incassi_perc_capped','y100','livello']] self.stage1_final = LogisticRegression(**STAGE1_LOGIT_PARAMS).fit(full_oh[self.feat_cols_full], full_oh['y100']) full_lt = full_oh[full_oh['y100']==0].copy() y_ord_full = pd.Categorical(full_lt['livello'], categories=LABELS, ordered=True).codes self.stage2_final = OrdinalXGB(n_classes=4, **STAGE2_ORD_XGB_PARAMS).fit(full_lt[self.feat_cols_full].values, y_ord_full) shap.initjs() rng = np.random.RandomState(0) bg_idx = rng.choice(len(full_oh), size=min(200, len(full_oh)), replace=False) bg_matrix = full_oh.iloc[bg_idx][self.feat_cols_full].values self.explainer_st1 = shap.LinearExplainer(self.stage1_final, bg_matrix, link=shap.links.identity) self.explainers_st2 = [shap.TreeExplainer(clf, bg_matrix, model_output="probability", feature_perturbation="interventional") for clf in self.stage2_final.models] self.ORIGINAL_CAT_COLS = [c for c in self.cat_cols] self.load_seconds = time.time()-t0 def preprocess_fit(self, train_df: pd.DataFrame): params = {} means = {c: train_df[c].mean(skipna=True) for c in self.num_cols} modes = {c: mode_(train_df[c]) for c in self.cat_cols} tr = train_df.copy() for c in self.num_cols: tr[c] = tr[c].fillna(means[c]) for c in self.cat_cols: tr[c] = tr[c].fillna(modes[c]).astype(str) one_level = [c for c in self.cat_cols if tr[c].nunique(dropna=True) < 2] keep_cats = [c for c in self.cat_cols if c not in one_level] params['removed_cats'] = one_level params['month_bins_days'] = MONTH_BINS_DAYS.tolist() params['month_labels'] = MONTH_LABELS tr['iscr_month_bin'] = pd.cut(tr['giorni_da_iscrizione'], MONTH_BINS_DAYS, labels=MONTH_LABELS, right=False, include_lowest=True) tr['cess_month_bin'] = pd.cut(tr['giorni_da_cessione'], MONTH_BINS_DAYS, labels=MONTH_LABELS, right=False, include_lowest=True) for c in ['iscr_month_bin','cess_month_bin']: if tr[c].nunique(dropna=True) >= 2 and c not in keep_cats: keep_cats.append(c) params['importo_bins'] = IMPORTO_BINS params['importo_labels'] = IMPORTO_LABELS tr['imp_bucket'] = pd.cut(tr['Importo iniziale outstanding'], IMPORTO_BINS, labels=IMPORTO_LABELS, right=False, include_lowest=True) if tr['imp_bucket'].nunique(dropna=True) >= 2 and 'imp_bucket' not in keep_cats: keep_cats.append('imp_bucket') params['keep_cats'] = keep_cats params['levels_map'] = {c: sorted(tr[c].astype(str).dropna().unique().tolist()) for c in keep_cats} x_imp_log = np.log1p(tr['Importo iniziale outstanding'].clip(lower=0)) params['scale_imp'] = (x_imp_log.mean(), x_imp_log.std(ddof=0) or 1.0) tr['x_imp_log'] = (x_imp_log - params['scale_imp'][0]) / params['scale_imp'][1] g_iscr_log = np.log(tr['giorni_da_iscrizione'].clip(lower=1)) params['scale_iscr'] = (g_iscr_log.mean(), g_iscr_log.std(ddof=0) or 1.0) tr['giorni_log'] = (g_iscr_log - params['scale_iscr'][0]) / params['scale_iscr'][1] g_cess = tr['giorni_da_cessione'] params['scale_cess'] = (g_cess.mean(), g_cess.std(ddof=0) or 1.0) tr['giorni_cessione_z'] = (g_cess - params['scale_cess'][0]) / params['scale_cess'][1] tr = tr.drop(columns=['Importo iniziale outstanding','giorni_da_iscrizione','giorni_da_cessione']) tr_oh = pd.get_dummies(tr, columns=keep_cats, drop_first=True, dtype=float) params['oh_columns'] = [c for c in tr_oh.columns if c not in ['incassi_perc_capped','y100','livello']] params['means'] = means params['modes'] = modes return params, tr_oh def preprocess_apply(self, test_df: pd.DataFrame): te = test_df.copy() for c in self.num_cols: te[c] = te[c].fillna(self.params['means'][c]) for c in self.cat_cols: te[c] = te[c].fillna(self.params['modes'][c]).astype(str) te = te.drop(columns=self.params['removed_cats'], errors='ignore') te['iscr_month_bin'] = pd.cut(te['giorni_da_iscrizione'], np.array(self.params['month_bins_days'], float), labels=self.params['month_labels'], right=False, include_lowest=True) te['cess_month_bin'] = pd.cut(te['giorni_da_cessione'], np.array(self.params['month_bins_days'], float), labels=self.params['month_labels'], right=False, include_lowest=True) te['imp_bucket'] = pd.cut(te['Importo iniziale outstanding'], np.array(self.params['importo_bins'], float), labels=self.params['importo_labels'], right=False, include_lowest=True) x_imp_log = np.log1p(te['Importo iniziale outstanding'].clip(lower=0)) te['x_imp_log'] = (x_imp_log - self.params['scale_imp'][0]) / self.params['scale_imp'][1] g_iscr_log = np.log(te['giorni_da_iscrizione'].clip(lower=1)) te['giorni_log'] = (g_iscr_log - self.params['scale_iscr'][0]) / self.params['scale_iscr'][1] te['giorni_cessione_z'] = (te['giorni_da_cessione'] - self.params['scale_cess'][0]) / self.params['scale_cess'][1] keep_cats = [c for c in self.cat_cols if c not in self.params['removed_cats']] for c in ['iscr_month_bin','cess_month_bin','imp_bucket']: if c not in keep_cats: keep_cats.append(c) te = te.drop(columns=['Importo iniziale outstanding','giorni_da_iscrizione','giorni_da_cessione']) te_oh = pd.get_dummies(te, columns=keep_cats, drop_first=True, dtype=float) for col in self.params['oh_columns']: if col not in te_oh.columns: te_oh[col] = 0.0 extra = [c for c in te_oh.columns if c not in self.params['oh_columns'] + ['incassi_perc_capped','y100','livello']] if extra: te_oh = te_oh.drop(columns=extra) target_cols_all = ['incassi_perc_capped','y100','livello'] target_cols_present = [c for c in target_cols_all if c in te_oh.columns] te_oh = te_oh[self.params['oh_columns'] + target_cols_present] return te_oh def active_levels_from_raw(self, raw_row: pd.DataFrame): out = {} s = raw_row.iloc[0] for c in self.ORIGINAL_CAT_COLS: v = s.get(c, np.nan) out[c] = self.params['levels_map'].get(c, ["(baseline)"])[0] if (pd.isna(v) or str(v).strip()=="") else str(v) return out def collapse_shap(self, vals_row: np.ndarray, feature_names, active_levels): vals_s = pd.Series(vals_row, index=feature_names) used=set(); out_vals=[]; out_names=[] for cat, levels in self.params['levels_map'].items(): prefix=f"{cat}_"; cols=[c for c in feature_names if c.startswith(prefix)] if not cols: continue used.update(cols) total=float(vals_s[cols].sum()) out_vals.append(total); out_names.append(f"{cat} = {active_levels.get(cat, levels[0] if levels else '(baseline)')}") for c in feature_names: if c in used or c in ["incassi_perc_capped","y100","livello"]: continue out_vals.append(float(vals_s[c])); out_names.append(c) out_vals=np.array(out_vals); out_names=np.array(out_names) idx=np.argsort(-np.abs(out_vals)) return out_names[idx], out_vals[idx] def explain_text_for_stage1(self, X_row, raw_row): vals = self.explainer_st1.shap_values(X_row.reshape(1,-1)) vals_row = vals[0] if hasattr(vals, "__len__") else vals return self.collapse_shap(vals_row, self.feat_cols_full, self.active_levels_from_raw(raw_row)) def explain_text_for_stage2(self, X_row, raw_row, k_thr: int): vals = self.explainers_st2[k_thr-1].shap_values(X_row.reshape(1,-1)) vals_row = vals[0] if hasattr(vals, "__len__") else vals return self.collapse_shap(vals_row, self.feat_cols_full, self.active_levels_from_raw(raw_row)) def summary_from_names_contrib(self, names, contrib, top_k=TOP_K_TEXT, min_abs=MIN_ABS_SHOW, include_neg=False): pos = [(n, v) for n, v in zip(names, contrib) if v >= min_abs][:top_k] neg = [(n, v) for n, v in zip(names, contrib) if v <= -min_abs][:top_k] if include_neg else [] def to_dict(items): return [ {"name": n, "delta_pp": float(abs(v))} for n, v in items ] return to_dict(pos), to_dict(neg), pos, neg def build_one_liner(self, final_class: str, stage_used: str, p100: float, yhat: float, k_thr: int | None, pos_pairs, neg_pairs): def short(items): take = items[:TOP_K_ONELINER] return ", ".join([f"{n} ({abs(v):.0%} pp)" for n, v in take]) if take else "—" if stage_used == "stage1": up = short([p for p in pos_pairs if p[1] > 0]) down = short([n for n in neg_pairs if n[1] < 0]) return (f"Classe **{final_class}**: p(100%)={p100:.0%}. " f"Hanno favorito: {up}; hanno penalizzato: {down}. " f"Valore atteso {yhat:.1f}.") else: up = short([p for p in pos_pairs if p[1] > 0]) down = short([n for n in neg_pairs if n[1] < 0]) return (f"Classe **{final_class}** (spiegazione su P(y≥{k_thr})): " f"in alto {up}; in basso {down}. Valore atteso {yhat:.1f}.") def predict_class_fast(self, payload: dict): """Like predict_dict ma senza SHAP: restituisce solo classe, p100, probs ordinali e valore atteso.""" raw = {k: payload.get(k, None) for k in FEATURE_MAP.values()} df_row_raw = pd.DataFrame([raw]) te_oh = self.preprocess_apply(df_row_raw) X_df = te_oh.reindex(columns=self.feat_cols_full, fill_value=0.0) X = X_df.values p100 = float(self.stage1_final.predict_proba(X)[:,1][0]) prob_ord = self.stage2_final.predict_proba(X)[0] prob_ord = prob_ord / (prob_ord.sum() or 1.0) yhat = 100.0*p100 + (1.0-p100)*float((prob_ord @ MIDPOINTS)) if p100 >= P100_THR_AUTO: final_class = "100%" else: k = int(np.argmax(prob_ord)) final_class = LABELS[k] return { "class": final_class, "p100": p100, "ordinal_probs": {LABELS[i]: float(prob_ord[i]) for i in range(len(LABELS))}, "expected_value": float(yhat) } def predict_dict(self, payload: dict, include_neg: bool=False): rid = str(uuid.uuid4()) raw = {k: payload.get(k, None) for k in FEATURE_MAP.values()} df_row_raw = pd.DataFrame([raw]) te_oh = self.preprocess_apply(df_row_raw) X_df = te_oh.reindex(columns=self.feat_cols_full, fill_value=0.0) X = X_df.values p100 = float(self.stage1_final.predict_proba(X)[:,1][0]) prob_ord = self.stage2_final.predict_proba(X)[0] prob_ord = prob_ord / (prob_ord.sum() or 1.0) yhat = 100.0*p100 + (1.0-p100)*float((prob_ord @ MIDPOINTS)) if p100 >= P100_THR_AUTO: names, contrib = self.explain_text_for_stage1(X[0], df_row_raw) txt_pos, txt_neg, pos_pairs, neg_pairs = self.summary_from_names_contrib( names, contrib, include_neg=include_neg ) final_class = "100%" stage_used = "stage1" k_thr = None else: k = int(np.argmax(prob_ord)); k_thr = min(max(1, k), 3) names, contrib = self.explain_text_for_stage2(X[0], df_row_raw, k_thr=k_thr) txt_pos, txt_neg, pos_pairs, neg_pairs = self.summary_from_names_contrib( names, contrib, include_neg=include_neg ) final_class = ["quasi_nulla","bassa","media","alta"][k] stage_used = "stage2" one_liner = self.build_one_liner(final_class, stage_used, p100, yhat, k_thr, pos_pairs, neg_pairs) return { "request_id": rid, "stage_used": stage_used, "class": final_class, "p100": p100, "expected_value": yhat, "ordinal_probs": {LABELS[i]: float(prob_ord[i]) for i in range(len(LABELS))}, "k_thr": k_thr, "shap": { "positivi_top": txt_pos, # punti di probabilità (0..1) "negativi_top": txt_neg if include_neg else [] }, "one_liner": one_liner }