| """ |
| feature_builder.py β Converts raw rule-engine output dicts into a clean |
| feature vector for the ML model. Single responsibility: no model logic here. |
| |
| Design decisions: |
| - All bool features cast to int (0/1) β LGBM handles natively but this |
| keeps the matrix dtype homogeneous. |
| - Engineered interaction terms computed here, not in regime/volume modules, |
| to keep those modules free of ML concerns. |
| - Returns a dict (for inference) or DataFrame row (for training). |
| - FEATURE_COLUMNS from ml_config defines the canonical order β any missing |
| feature raises KeyError immediately rather than silently producing NaN. |
| """ |
|
|
| import math |
| from typing import Dict, Any |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| from ml_config import FEATURE_COLUMNS |
|
|
|
|
| def build_feature_dict( |
| regime_data: Dict[str, Any], |
| volume_data: Dict[str, Any], |
| scores: Dict[str, Any], |
| ) -> Dict[str, float]: |
| """ |
| Build the canonical feature dict from rule-engine outputs. |
| All values are Python floats or ints β no pandas/numpy scalars. |
| """ |
| adx = float(regime_data.get("adx", 0.0)) |
| di_plus = float(regime_data.get("di_plus", 0.0)) |
| di_minus = float(regime_data.get("di_minus", 0.0)) |
| di_sum = di_plus + di_minus + 1e-9 |
| di_diff = di_plus - di_minus |
| di_ratio = di_plus / di_sum |
|
|
| atr_pct = float(regime_data.get("atr_pct", 0.0)) |
| vol_ratio = float(regime_data.get("vol_ratio", 1.0)) |
| vol_compressed = int(bool(regime_data.get("vol_compressed", False))) |
| vol_expanding = int(bool(regime_data.get("vol_expanding", False))) |
| vol_expanding_from_base = int(bool(regime_data.get("vol_expanding_from_base", False))) |
|
|
| absorption = int(bool(volume_data.get("absorption", False))) |
| failed_breakout = int(bool(volume_data.get("failed_breakout", False))) |
| recent_failed_count = int(volume_data.get("recent_failed_count", 0)) |
| obv_slope_norm = float(volume_data.get("obv_slope_norm", 0.0)) |
| delta_sign = int(volume_data.get("delta_sign", 0)) |
| spike = int(bool(volume_data.get("spike", False))) |
| climax = int(bool(volume_data.get("climax", False))) |
|
|
| dist_atr = float(regime_data.get("dist_atr", 0.0)) |
| dist_atr_abs = abs(dist_atr) |
|
|
| regime_confidence = float(regime_data.get("regime_confidence", 0.0)) |
| regime_score = float(scores.get("regime_score", 0.0)) |
| volume_score = float(scores.get("volume_score", 0.0)) |
| structure_score = float(scores.get("structure_score", 0.0)) |
| confidence_score = float(scores.get("confidence_score", 0.0)) |
| total_score = float(scores.get("total_score", 0.0)) |
|
|
| |
| adx_x_regime = adx * regime_score |
| vol_x_obv = vol_ratio * obv_slope_norm |
| score_x_conf = total_score * regime_confidence |
|
|
| raw = { |
| "adx": adx, |
| "di_plus": di_plus, |
| "di_minus": di_minus, |
| "di_diff": di_diff, |
| "di_ratio": di_ratio, |
| "atr_pct": atr_pct, |
| "vol_ratio": vol_ratio, |
| "vol_compressed": vol_compressed, |
| "vol_expanding": vol_expanding, |
| "vol_expanding_from_base": vol_expanding_from_base, |
| "absorption": absorption, |
| "failed_breakout": failed_breakout, |
| "recent_failed_count": recent_failed_count, |
| "obv_slope_norm": obv_slope_norm, |
| "delta_sign": delta_sign, |
| "spike": spike, |
| "climax": climax, |
| "dist_atr": dist_atr, |
| "dist_atr_abs": dist_atr_abs, |
| "regime_confidence": regime_confidence, |
| "regime_score": regime_score, |
| "volume_score": volume_score, |
| "structure_score": structure_score, |
| "confidence_score": confidence_score, |
| "total_score": total_score, |
| "adx_x_regime": adx_x_regime, |
| "vol_x_obv": vol_x_obv, |
| "score_x_conf": score_x_conf, |
| } |
|
|
| |
| missing = set(FEATURE_COLUMNS) - set(raw.keys()) |
| if missing: |
| raise KeyError(f"Missing features: {missing}") |
|
|
| |
| return {k: raw[k] for k in FEATURE_COLUMNS} |
|
|
|
|
| def feature_dict_to_row(feat: Dict[str, float]) -> pd.Series: |
| """Convert feature dict to a pandas Series with canonical column order.""" |
| return pd.Series({k: feat[k] for k in FEATURE_COLUMNS}) |
|
|
|
|
| def feature_dict_to_matrix(feat: Dict[str, float]) -> np.ndarray: |
| """ |
| Convert single feature dict to (1, n_features) numpy array for inference. |
| Preserves canonical column order from FEATURE_COLUMNS. |
| """ |
| return np.array([[feat[k] for k in FEATURE_COLUMNS]], dtype=np.float64) |
|
|
|
|
| def validate_features(feat: Dict[str, float]) -> bool: |
| """Return True if all features are finite and present.""" |
| for k in FEATURE_COLUMNS: |
| v = feat.get(k) |
| if v is None or (isinstance(v, float) and not math.isfinite(v)): |
| return False |
| return True |
|
|