Spaces:
Sleeping
Sleeping
import numpy as np, pandas as pd, warnings, time, uuid | |
warnings.filterwarnings("ignore") | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.base import BaseEstimator, ClassifierMixin | |
import xgboost as xgb | |
import shap | |
# -------------------- CONFIG -------------------- | |
DATA_PATH = "/app/data/final_report.csv" # <— assicurati che il file esista nel container! | |
FEATURE_MAP = { | |
"Debitore_cluster": "Debitore_cluster", | |
"Stato_Giudizio": "Stato_Giudizio", | |
"Cedente": "Cedente", | |
"Importo.iniziale.outstanding": "Importo iniziale outstanding", | |
"Decreto.sospeso": "Decreto sospeso", | |
"Notifica.Decreto": "Notifica Decreto", | |
"Opposizione.al.decreto.ingiuntivo": "Opposizione al decreto ingiuntivo", | |
"Ricorso.al.TAR": "Ricorso al TAR", | |
"Sentenza.TAR": "Sentenza TAR", | |
"Atto.di.Precetto": "Atto di Precetto", | |
"Decreto.Ingiuntivo": "Decreto Ingiuntivo", | |
"Sentenza.giudizio.opposizione": "Sentenza giudizio opposizione", | |
"giorni_da_iscrizione": "giorni_da_iscrizione", | |
"giorni_da_cessione": "giorni_da_cessione", | |
"Zona": "Zona" | |
} | |
LABELS = ["quasi_nulla","bassa","media","alta"] | |
BINS = [0, 11, 30, 70, 100] | |
MIDPOINTS = np.array([5.5, 20.5, 50.0, 85.0]) | |
MONTH_BINS_DAYS = np.array([0, 30, 60, 90, 180, 360, 720, 1e9], dtype=float) | |
MONTH_LABELS = ["<1m","1–2m","2–3m","3–6m","6–12m","12–24m",">=24m"] | |
IMPORTO_BINS = [0.0,1_000.0,10_000.0,50_000.0,100_000.0,500_000.0,1_000_000.0,2_000_000.0] | |
IMPORTO_LABELS = ["<1k","1–10k","10–50k","50–100k","100–500k","500k–1M",">=1M"] | |
RANDOM_STATE = 42 | |
P100_THR_AUTO = 0.71 | |
STAGE1_LOGIT_PARAMS = dict(max_iter=500, solver='liblinear') | |
STAGE2_ORD_XGB_PARAMS = dict( | |
objective="binary:logistic", n_estimators=700, learning_rate=0.05, | |
max_depth=4, subsample=0.9, colsample_bytree=0.85, | |
min_child_weight=2.0, gamma=0.1, reg_lambda=5.0, reg_alpha=0.5, | |
n_jobs=-1, random_state=RANDOM_STATE, verbosity=0 | |
) | |
TOP_K_TEXT = 3 | |
MIN_ABS_SHOW = 0.01 | |
TOP_K_ONELINER = 2 | |
# -------------------- MODELLO ORDINATO -------------------- | |
class OrdinalXGB(BaseEstimator, ClassifierMixin): | |
def __init__(self, n_classes=4, **xgb_params): | |
self.n_classes = n_classes | |
self.xgb_params = xgb_params | |
self.models = [] | |
def fit(self, X, y, sample_weight=None): | |
self.models = [] | |
for k in range(1, self.n_classes): | |
y_bin = (y >= k).astype(int) | |
clf = xgb.XGBClassifier(**self.xgb_params) | |
clf.fit(X, y_bin, sample_weight=sample_weight) | |
self.models.append(clf) | |
return self | |
def _cum_probs(self, X): | |
cps = np.vstack([clf.predict_proba(X)[:,1] for clf in self.models]).T | |
cps = np.clip(cps, 1e-6, 1-1e-6) | |
for k in range(1, cps.shape[1]): cps[:,k] = np.minimum(cps[:,k-1], cps[:,k]) | |
return cps | |
def predict_proba(self, X): | |
cps = self._cum_probs(X); n = X.shape[0] | |
proba = np.zeros((n, self.n_classes)) | |
proba[:,0] = 1 - cps[:,0] | |
for k in range(1, self.n_classes-1): proba[:,k] = cps[:,k-1] - cps[:,k] | |
proba[:,-1] = cps[:,-1] | |
s = proba.sum(axis=1, keepdims=True); s[s==0]=1.0 | |
return np.clip(proba/s, 0, 1) | |
def mode_(s: pd.Series): | |
s = s.dropna() | |
return s.mode().iloc[0] if len(s) else np.nan | |
class Predictor: | |
def __init__(self, data_path=DATA_PATH): | |
t0 = time.time() | |
self.data_path = data_path | |
df = pd.read_csv(self.data_path) | |
inc = df['incassi_perc'].replace([np.inf,-np.inf], np.nan).fillna(100.0).clip(0,100) | |
df_model = df[[v for v in FEATURE_MAP.values() if v in df.columns]].copy() | |
df_model['incassi_perc_capped'] = inc | |
df_model['y100'] = (inc >= 100.0-1e-9).astype(int) | |
df_model['livello'] = pd.cut(np.minimum(inc, 99.999), bins=BINS, labels=LABELS, right=False, include_lowest=True) | |
self.num_cols, self.cat_cols = [], [] | |
for c in FEATURE_MAP.values(): | |
if c in df_model.columns: | |
(self.num_cols if pd.api.types.is_numeric_dtype(df_model[c]) else self.cat_cols).append(c) | |
self.params, full_oh = self.preprocess_fit(df_model) | |
self.feat_cols_full = [c for c in full_oh.columns if c not in ['incassi_perc_capped','y100','livello']] | |
self.stage1_final = LogisticRegression(**STAGE1_LOGIT_PARAMS).fit(full_oh[self.feat_cols_full], full_oh['y100']) | |
full_lt = full_oh[full_oh['y100']==0].copy() | |
y_ord_full = pd.Categorical(full_lt['livello'], categories=LABELS, ordered=True).codes | |
self.stage2_final = OrdinalXGB(n_classes=4, **STAGE2_ORD_XGB_PARAMS).fit(full_lt[self.feat_cols_full].values, y_ord_full) | |
shap.initjs() | |
rng = np.random.RandomState(0) | |
bg_idx = rng.choice(len(full_oh), size=min(200, len(full_oh)), replace=False) | |
bg_matrix = full_oh.iloc[bg_idx][self.feat_cols_full].values | |
self.explainer_st1 = shap.LinearExplainer(self.stage1_final, bg_matrix, link=shap.links.identity) | |
self.explainers_st2 = [shap.TreeExplainer(clf, bg_matrix, model_output="probability", | |
feature_perturbation="interventional") | |
for clf in self.stage2_final.models] | |
self.ORIGINAL_CAT_COLS = [c for c in self.cat_cols] | |
self.load_seconds = time.time()-t0 | |
def preprocess_fit(self, train_df: pd.DataFrame): | |
params = {} | |
means = {c: train_df[c].mean(skipna=True) for c in self.num_cols} | |
modes = {c: mode_(train_df[c]) for c in self.cat_cols} | |
tr = train_df.copy() | |
for c in self.num_cols: tr[c] = tr[c].fillna(means[c]) | |
for c in self.cat_cols: tr[c] = tr[c].fillna(modes[c]).astype(str) | |
one_level = [c for c in self.cat_cols if tr[c].nunique(dropna=True) < 2] | |
keep_cats = [c for c in self.cat_cols if c not in one_level] | |
params['removed_cats'] = one_level | |
params['month_bins_days'] = MONTH_BINS_DAYS.tolist() | |
params['month_labels'] = MONTH_LABELS | |
tr['iscr_month_bin'] = pd.cut(tr['giorni_da_iscrizione'], MONTH_BINS_DAYS, labels=MONTH_LABELS, right=False, include_lowest=True) | |
tr['cess_month_bin'] = pd.cut(tr['giorni_da_cessione'], MONTH_BINS_DAYS, labels=MONTH_LABELS, right=False, include_lowest=True) | |
for c in ['iscr_month_bin','cess_month_bin']: | |
if tr[c].nunique(dropna=True) >= 2 and c not in keep_cats: | |
keep_cats.append(c) | |
params['importo_bins'] = IMPORTO_BINS | |
params['importo_labels'] = IMPORTO_LABELS | |
tr['imp_bucket'] = pd.cut(tr['Importo iniziale outstanding'], IMPORTO_BINS, labels=IMPORTO_LABELS, right=False, include_lowest=True) | |
if tr['imp_bucket'].nunique(dropna=True) >= 2 and 'imp_bucket' not in keep_cats: | |
keep_cats.append('imp_bucket') | |
params['keep_cats'] = keep_cats | |
params['levels_map'] = {c: sorted(tr[c].astype(str).dropna().unique().tolist()) for c in keep_cats} | |
x_imp_log = np.log1p(tr['Importo iniziale outstanding'].clip(lower=0)) | |
params['scale_imp'] = (x_imp_log.mean(), x_imp_log.std(ddof=0) or 1.0) | |
tr['x_imp_log'] = (x_imp_log - params['scale_imp'][0]) / params['scale_imp'][1] | |
g_iscr_log = np.log(tr['giorni_da_iscrizione'].clip(lower=1)) | |
params['scale_iscr'] = (g_iscr_log.mean(), g_iscr_log.std(ddof=0) or 1.0) | |
tr['giorni_log'] = (g_iscr_log - params['scale_iscr'][0]) / params['scale_iscr'][1] | |
g_cess = tr['giorni_da_cessione'] | |
params['scale_cess'] = (g_cess.mean(), g_cess.std(ddof=0) or 1.0) | |
tr['giorni_cessione_z'] = (g_cess - params['scale_cess'][0]) / params['scale_cess'][1] | |
tr = tr.drop(columns=['Importo iniziale outstanding','giorni_da_iscrizione','giorni_da_cessione']) | |
tr_oh = pd.get_dummies(tr, columns=keep_cats, drop_first=True, dtype=float) | |
params['oh_columns'] = [c for c in tr_oh.columns if c not in ['incassi_perc_capped','y100','livello']] | |
params['means'] = means | |
params['modes'] = modes | |
return params, tr_oh | |
def preprocess_apply(self, test_df: pd.DataFrame): | |
te = test_df.copy() | |
for c in self.num_cols: te[c] = te[c].fillna(self.params['means'][c]) | |
for c in self.cat_cols: te[c] = te[c].fillna(self.params['modes'][c]).astype(str) | |
te = te.drop(columns=self.params['removed_cats'], errors='ignore') | |
te['iscr_month_bin'] = pd.cut(te['giorni_da_iscrizione'], np.array(self.params['month_bins_days'], float), | |
labels=self.params['month_labels'], right=False, include_lowest=True) | |
te['cess_month_bin'] = pd.cut(te['giorni_da_cessione'], np.array(self.params['month_bins_days'], float), | |
labels=self.params['month_labels'], right=False, include_lowest=True) | |
te['imp_bucket'] = pd.cut(te['Importo iniziale outstanding'], np.array(self.params['importo_bins'], float), | |
labels=self.params['importo_labels'], right=False, include_lowest=True) | |
x_imp_log = np.log1p(te['Importo iniziale outstanding'].clip(lower=0)) | |
te['x_imp_log'] = (x_imp_log - self.params['scale_imp'][0]) / self.params['scale_imp'][1] | |
g_iscr_log = np.log(te['giorni_da_iscrizione'].clip(lower=1)) | |
te['giorni_log'] = (g_iscr_log - self.params['scale_iscr'][0]) / self.params['scale_iscr'][1] | |
te['giorni_cessione_z'] = (te['giorni_da_cessione'] - self.params['scale_cess'][0]) / self.params['scale_cess'][1] | |
keep_cats = [c for c in self.cat_cols if c not in self.params['removed_cats']] | |
for c in ['iscr_month_bin','cess_month_bin','imp_bucket']: | |
if c not in keep_cats: keep_cats.append(c) | |
te = te.drop(columns=['Importo iniziale outstanding','giorni_da_iscrizione','giorni_da_cessione']) | |
te_oh = pd.get_dummies(te, columns=keep_cats, drop_first=True, dtype=float) | |
for col in self.params['oh_columns']: | |
if col not in te_oh.columns: te_oh[col] = 0.0 | |
extra = [c for c in te_oh.columns if c not in self.params['oh_columns'] + ['incassi_perc_capped','y100','livello']] | |
if extra: te_oh = te_oh.drop(columns=extra) | |
target_cols_all = ['incassi_perc_capped','y100','livello'] | |
target_cols_present = [c for c in target_cols_all if c in te_oh.columns] | |
te_oh = te_oh[self.params['oh_columns'] + target_cols_present] | |
return te_oh | |
def active_levels_from_raw(self, raw_row: pd.DataFrame): | |
out = {} | |
s = raw_row.iloc[0] | |
for c in self.ORIGINAL_CAT_COLS: | |
v = s.get(c, np.nan) | |
out[c] = self.params['levels_map'].get(c, ["(baseline)"])[0] if (pd.isna(v) or str(v).strip()=="") else str(v) | |
return out | |
def collapse_shap(self, vals_row: np.ndarray, feature_names, active_levels): | |
vals_s = pd.Series(vals_row, index=feature_names) | |
used=set(); out_vals=[]; out_names=[] | |
for cat, levels in self.params['levels_map'].items(): | |
prefix=f"{cat}_"; cols=[c for c in feature_names if c.startswith(prefix)] | |
if not cols: continue | |
used.update(cols) | |
total=float(vals_s[cols].sum()) | |
out_vals.append(total); out_names.append(f"{cat} = {active_levels.get(cat, levels[0] if levels else '(baseline)')}") | |
for c in feature_names: | |
if c in used or c in ["incassi_perc_capped","y100","livello"]: continue | |
out_vals.append(float(vals_s[c])); out_names.append(c) | |
out_vals=np.array(out_vals); out_names=np.array(out_names) | |
idx=np.argsort(-np.abs(out_vals)) | |
return out_names[idx], out_vals[idx] | |
def explain_text_for_stage1(self, X_row, raw_row): | |
vals = self.explainer_st1.shap_values(X_row.reshape(1,-1)) | |
vals_row = vals[0] if hasattr(vals, "__len__") else vals | |
return self.collapse_shap(vals_row, self.feat_cols_full, self.active_levels_from_raw(raw_row)) | |
def explain_text_for_stage2(self, X_row, raw_row, k_thr: int): | |
vals = self.explainers_st2[k_thr-1].shap_values(X_row.reshape(1,-1)) | |
vals_row = vals[0] if hasattr(vals, "__len__") else vals | |
return self.collapse_shap(vals_row, self.feat_cols_full, self.active_levels_from_raw(raw_row)) | |
def summary_from_names_contrib(self, names, contrib, top_k=TOP_K_TEXT, min_abs=MIN_ABS_SHOW, include_neg=False): | |
pos = [(n, v) for n, v in zip(names, contrib) if v >= min_abs][:top_k] | |
neg = [(n, v) for n, v in zip(names, contrib) if v <= -min_abs][:top_k] if include_neg else [] | |
def to_dict(items): return [ {"name": n, "delta_pp": float(abs(v))} for n, v in items ] | |
return to_dict(pos), to_dict(neg), pos, neg | |
def build_one_liner(self, final_class: str, stage_used: str, p100: float, yhat: float, | |
k_thr: int | None, pos_pairs, neg_pairs): | |
def short(items): | |
take = items[:TOP_K_ONELINER] | |
return ", ".join([f"{n} ({abs(v):.0%} pp)" for n, v in take]) if take else "—" | |
if stage_used == "stage1": | |
up = short([p for p in pos_pairs if p[1] > 0]) | |
down = short([n for n in neg_pairs if n[1] < 0]) | |
return (f"Classe **{final_class}**: p(100%)={p100:.0%}. " | |
f"Hanno favorito: {up}; hanno penalizzato: {down}. " | |
f"Valore atteso {yhat:.1f}.") | |
else: | |
up = short([p for p in pos_pairs if p[1] > 0]) | |
down = short([n for n in neg_pairs if n[1] < 0]) | |
return (f"Classe **{final_class}** (spiegazione su P(y≥{k_thr})): " | |
f"in alto {up}; in basso {down}. Valore atteso {yhat:.1f}.") | |
def predict_class_fast(self, payload: dict): | |
"""Like predict_dict ma senza SHAP: restituisce solo classe, p100, probs ordinali e valore atteso.""" | |
raw = {k: payload.get(k, None) for k in FEATURE_MAP.values()} | |
df_row_raw = pd.DataFrame([raw]) | |
te_oh = self.preprocess_apply(df_row_raw) | |
X_df = te_oh.reindex(columns=self.feat_cols_full, fill_value=0.0) | |
X = X_df.values | |
p100 = float(self.stage1_final.predict_proba(X)[:,1][0]) | |
prob_ord = self.stage2_final.predict_proba(X)[0] | |
prob_ord = prob_ord / (prob_ord.sum() or 1.0) | |
yhat = 100.0*p100 + (1.0-p100)*float((prob_ord @ MIDPOINTS)) | |
if p100 >= P100_THR_AUTO: | |
final_class = "100%" | |
else: | |
k = int(np.argmax(prob_ord)) | |
final_class = LABELS[k] | |
return { | |
"class": final_class, | |
"p100": p100, | |
"ordinal_probs": {LABELS[i]: float(prob_ord[i]) for i in range(len(LABELS))}, | |
"expected_value": float(yhat) | |
} | |
def predict_dict(self, payload: dict, include_neg: bool=False): | |
rid = str(uuid.uuid4()) | |
raw = {k: payload.get(k, None) for k in FEATURE_MAP.values()} | |
df_row_raw = pd.DataFrame([raw]) | |
te_oh = self.preprocess_apply(df_row_raw) | |
X_df = te_oh.reindex(columns=self.feat_cols_full, fill_value=0.0) | |
X = X_df.values | |
p100 = float(self.stage1_final.predict_proba(X)[:,1][0]) | |
prob_ord = self.stage2_final.predict_proba(X)[0] | |
prob_ord = prob_ord / (prob_ord.sum() or 1.0) | |
yhat = 100.0*p100 + (1.0-p100)*float((prob_ord @ MIDPOINTS)) | |
if p100 >= P100_THR_AUTO: | |
names, contrib = self.explain_text_for_stage1(X[0], df_row_raw) | |
txt_pos, txt_neg, pos_pairs, neg_pairs = self.summary_from_names_contrib( | |
names, contrib, include_neg=include_neg | |
) | |
final_class = "100%" | |
stage_used = "stage1" | |
k_thr = None | |
else: | |
k = int(np.argmax(prob_ord)); k_thr = min(max(1, k), 3) | |
names, contrib = self.explain_text_for_stage2(X[0], df_row_raw, k_thr=k_thr) | |
txt_pos, txt_neg, pos_pairs, neg_pairs = self.summary_from_names_contrib( | |
names, contrib, include_neg=include_neg | |
) | |
final_class = ["quasi_nulla","bassa","media","alta"][k] | |
stage_used = "stage2" | |
one_liner = self.build_one_liner(final_class, stage_used, p100, yhat, k_thr, pos_pairs, neg_pairs) | |
return { | |
"request_id": rid, | |
"stage_used": stage_used, | |
"class": final_class, | |
"p100": p100, | |
"expected_value": yhat, | |
"ordinal_probs": {LABELS[i]: float(prob_ord[i]) for i in range(len(LABELS))}, | |
"k_thr": k_thr, | |
"shap": { | |
"positivi_top": txt_pos, # punti di probabilità (0..1) | |
"negativi_top": txt_neg if include_neg else [] | |
}, | |
"one_liner": one_liner | |
} | |