| """ |
| model_backend.py β Gradient boosting abstraction for LightGBM / sklearn HGBM. |
| |
| LightGBM (preferred): |
| pip install lightgbm |
| Set USE_LIGHTGBM = True below. |
| |
| Fallback: sklearn HistGradientBoostingClassifier. |
| Same algorithm family, native NaN support, comparable speed. |
| Feature importances use permutation importance (val set). |
| |
| Interface is identical regardless of backend: |
| .fit() β trains + calibrates |
| .predict_win_prob() β P(win) per row |
| .feature_importances_ β normalized importance array |
| """ |
|
|
| import numpy as np |
|
|
| try: |
| import lightgbm as lgb |
| _LGBM_AVAILABLE = True |
| except ImportError: |
| _LGBM_AVAILABLE = False |
|
|
| from sklearn.ensemble import HistGradientBoostingClassifier |
| from sklearn.calibration import CalibratedClassifierCV |
| from sklearn.inspection import permutation_importance |
|
|
| USE_LIGHTGBM = False |
|
|
|
|
| def _build_lgbm(p: dict): |
| return lgb.LGBMClassifier( |
| n_estimators = p.get("n_estimators", 400), |
| learning_rate = p.get("learning_rate", 0.03), |
| max_depth = p.get("max_depth", 5), |
| min_child_samples = p.get("min_samples_leaf", 40), |
| reg_lambda = p.get("l2_regularization", 2.0), |
| feature_fraction = p.get("max_features", 0.70), |
| subsample = 0.80, |
| subsample_freq = 1, |
| n_jobs = -1, |
| random_state = p.get("random_state", 42), |
| verbosity = -1, |
| objective = "binary", |
| metric = "binary_logloss", |
| early_stopping_rounds = p.get("early_stopping_rounds", 30), |
| ) |
|
|
|
|
| def _build_hgbm(p: dict): |
| return HistGradientBoostingClassifier( |
| max_iter = p.get("n_estimators", 400), |
| learning_rate = p.get("learning_rate", 0.03), |
| max_depth = p.get("max_depth", 5), |
| min_samples_leaf = p.get("min_samples_leaf", 40), |
| l2_regularization = p.get("l2_regularization", 2.0), |
| max_features = p.get("max_features", 0.70), |
| early_stopping = True, |
| validation_fraction = p.get("validation_fraction", 0.15), |
| n_iter_no_change = p.get("n_iter_no_change", 30), |
| random_state = p.get("random_state", 42), |
| verbose = 0, |
| ) |
|
|
|
|
| class ModelBackend: |
| """ |
| Unified classifier. After fit(): |
| .predict_proba(X) β (N, 2) array |
| .predict_win_prob(X) β (N,) array of P(win) |
| .feature_importances_ β (n_features,) normalized importances |
| .n_iter_ β actual boosting rounds used |
| """ |
|
|
| def __init__(self, params: dict, calibrate: bool = True): |
| self.params = params |
| self.calibrate = calibrate |
| self._base = None |
| self._model = None |
| self.feature_importances_: np.ndarray = np.array([]) |
| self.n_iter_: int = 0 |
| self._backend_name = "lightgbm" if (USE_LIGHTGBM and _LGBM_AVAILABLE) else "hgbm" |
|
|
| @property |
| def backend_name(self) -> str: |
| return self._backend_name |
|
|
| def fit( |
| self, |
| X_train: np.ndarray, |
| y_train: np.ndarray, |
| X_val: np.ndarray = None, |
| y_val: np.ndarray = None, |
| sample_weight: np.ndarray = None, |
| ) -> "ModelBackend": |
| sw = sample_weight |
|
|
| if self._backend_name == "lightgbm": |
| self._base = _build_lgbm(self.params) |
| kw = {} |
| if X_val is not None: |
| kw["eval_set"] = [(X_val, y_val)] |
| if sw is not None: |
| kw["sample_weight"] = sw |
| self._base.fit(X_train, y_train, **kw) |
| self.n_iter_ = int(getattr(self._base, "best_iteration_", 0)) |
| else: |
| self._base = _build_hgbm(self.params) |
| kw = {} |
| if sw is not None: |
| kw["sample_weight"] = sw |
| self._base.fit(X_train, y_train, **kw) |
| self.n_iter_ = int(getattr(self._base, "n_iter_", self.params.get("n_estimators", 400))) |
|
|
| |
| if (self.calibrate and X_val is not None and |
| len(X_val) >= 50 and len(np.unique(y_val)) == 2): |
| cal = CalibratedClassifierCV(self._base, method="isotonic", cv=5) |
| cal.fit(X_val, y_val) |
| self._model = cal |
| else: |
| self._model = self._base |
|
|
| |
| self._compute_importances(X_val, y_val) |
| return self |
|
|
| def _compute_importances(self, X_val: np.ndarray = None, y_val: np.ndarray = None): |
| base = self._base |
| if base is None: |
| return |
|
|
| |
| if hasattr(base, "feature_importances_"): |
| imp = np.array(base.feature_importances_, dtype=np.float64) |
| |
| elif X_val is not None and len(X_val) >= 20: |
| result = permutation_importance( |
| base, X_val, y_val, |
| n_repeats=5, |
| random_state=42, |
| n_jobs=-1, |
| ) |
| imp = np.maximum(result.importances_mean, 0.0) |
| else: |
| |
| n_feat = getattr(base, "n_features_in_", 1) |
| imp = np.ones(n_feat, dtype=np.float64) |
|
|
| |
| total = imp.sum() |
| self.feature_importances_ = imp / total if total > 0 else imp |
|
|
| def predict_proba(self, X: np.ndarray) -> np.ndarray: |
| if self._model is None: |
| raise RuntimeError("Call .fit() before .predict_proba().") |
| return self._model.predict_proba(X) |
|
|
| def predict_win_prob(self, X: np.ndarray) -> np.ndarray: |
| """Return 1-D array of P(win) for each row.""" |
| return self.predict_proba(X)[:, 1] |
|
|