| """ |
| Meta-feature extraction module for discrete observational datasets. |
| |
| Extracts ~34 features across 5 categories: |
| - Tier 1: Basic descriptors (6 features) |
| - Tier 2: Information-theoretic (8 features) |
| - Tier 3: Dependency structure (8 features) |
| - Tier 4: CI test landmark probes (6 features) |
| - Tier 5: Distribution shape (6 features) |
| """ |
| import numpy as np |
| import pandas as pd |
| from scipy.stats import entropy, chi2_contingency |
| from itertools import combinations |
| import warnings |
| import logging |
|
|
| warnings.filterwarnings('ignore') |
| logger = logging.getLogger(__name__) |
|
|
|
|
| def extract_all_features(df, n_probe_triplets=100, alpha=0.05): |
| """Extract all meta-features from a discrete dataset. |
| |
| Args: |
| df: pd.DataFrame with integer-encoded discrete columns |
| n_probe_triplets: number of random (X,Y,Z) triplets for CI probes |
| alpha: significance level for dependency tests |
| |
| Returns: |
| dict of feature_name -> float |
| """ |
| features = {} |
| |
| |
| features.update(_basic_features(df)) |
| |
| |
| features.update(_info_theory_features(df)) |
| |
| |
| features.update(_dependency_features(df, alpha=alpha)) |
| |
| |
| features.update(_ci_probe_features(df, n_probes=n_probe_triplets, alpha=alpha)) |
| |
| |
| features.update(_distribution_features(df)) |
| |
| return features |
|
|
|
|
| def _basic_features(df): |
| """Tier 1: Basic dataset descriptors.""" |
| n_samples, n_vars = df.shape |
| cardinalities = df.nunique().values |
| |
| return { |
| 'n_samples': n_samples, |
| 'n_variables': n_vars, |
| 'n_over_p': n_samples / max(n_vars, 1), |
| 'avg_cardinality': cardinalities.mean(), |
| 'max_cardinality': cardinalities.max(), |
| 'min_cardinality': cardinalities.min(), |
| } |
|
|
|
|
| def _info_theory_features(df): |
| """Tier 2: Information-theoretic features.""" |
| n, p = df.shape |
| |
| |
| entropies = [] |
| for col in df.columns: |
| vc = df[col].value_counts(normalize=True) |
| entropies.append(entropy(vc)) |
| entropies = np.array(entropies) |
| |
| |
| cols = list(range(p)) |
| all_pairs = list(combinations(cols, 2)) |
| |
| |
| max_pairs = min(len(all_pairs), 500) |
| if len(all_pairs) > max_pairs: |
| rng = np.random.RandomState(42) |
| pair_indices = rng.choice(len(all_pairs), max_pairs, replace=False) |
| pairs = [all_pairs[i] for i in pair_indices] |
| else: |
| pairs = all_pairs |
| |
| mis = [] |
| norm_mis = [] |
| for i, j in pairs: |
| mi = _mutual_information(df.iloc[:, i].values, df.iloc[:, j].values) |
| mis.append(mi) |
| |
| |
| denom = np.sqrt(entropies[i] * entropies[j]) |
| if denom > 1e-10: |
| norm_mis.append(mi / denom) |
| else: |
| norm_mis.append(0.0) |
| |
| mis = np.array(mis) |
| norm_mis = np.array(norm_mis) |
| |
| return { |
| 'mean_entropy': entropies.mean(), |
| 'std_entropy': entropies.std(), |
| 'max_entropy': entropies.max(), |
| 'mean_pairwise_MI': mis.mean(), |
| 'std_pairwise_MI': mis.std(), |
| 'max_pairwise_MI': mis.max(), |
| 'mean_normalized_MI': norm_mis.mean(), |
| 'frac_high_MI_pairs': (mis > 0.05).mean(), |
| } |
|
|
|
|
| def _dependency_features(df, alpha=0.05): |
| """Tier 3: Dependency structure features via chi-squared tests.""" |
| n, p = df.shape |
| cols = list(range(p)) |
| all_pairs = list(combinations(cols, 2)) |
| |
| |
| max_pairs = min(len(all_pairs), 500) |
| if len(all_pairs) > max_pairs: |
| rng = np.random.RandomState(42) |
| pair_indices = rng.choice(len(all_pairs), max_pairs, replace=False) |
| pairs = [all_pairs[i] for i in pair_indices] |
| else: |
| pairs = all_pairs |
| |
| chi2_stats = [] |
| pvals = [] |
| cramers_vs = [] |
| |
| for i, j in pairs: |
| try: |
| ct = pd.crosstab(df.iloc[:, i], df.iloc[:, j]) |
| if ct.shape[0] < 2 or ct.shape[1] < 2: |
| continue |
| chi2, pval, dof, expected = chi2_contingency(ct) |
| chi2_stats.append(chi2) |
| pvals.append(pval) |
| |
| |
| min_dim = min(ct.shape[0], ct.shape[1]) - 1 |
| if min_dim > 0 and n > 0: |
| v = np.sqrt(chi2 / (n * min_dim)) |
| cramers_vs.append(v) |
| except Exception: |
| continue |
| |
| chi2_stats = np.array(chi2_stats) if chi2_stats else np.array([0.0]) |
| pvals = np.array(pvals) if pvals else np.array([1.0]) |
| cramers_vs = np.array(cramers_vs) if cramers_vs else np.array([0.0]) |
| |
| return { |
| 'density_proxy': (pvals < alpha).mean(), |
| 'mean_chi2_stat': chi2_stats.mean(), |
| 'std_chi2_stat': chi2_stats.std(), |
| 'max_chi2_stat': chi2_stats.max(), |
| 'mean_cramers_v': cramers_vs.mean(), |
| 'max_cramers_v': cramers_vs.max(), |
| 'frac_weak_deps': (cramers_vs < 0.1).mean(), |
| 'frac_strong_deps': (cramers_vs > 0.3).mean(), |
| } |
|
|
|
|
| def _ci_probe_features(df, n_probes=100, alpha=0.05): |
| """Tier 4: Conditional independence test landmark probes. |
| |
| Sample random (X, Y, Z) triplets: |
| - Test X ⊥ Y (marginal) |
| - Test X ⊥ Y | Z (conditional) |
| Summarize test statistics. |
| """ |
| n, p = df.shape |
| |
| if p < 3: |
| return { |
| 'mean_pval_marginal': 0.5, |
| 'frac_dep_marginal': 0.5, |
| 'mean_pval_conditional': 0.5, |
| 'frac_dep_conditional': 0.5, |
| 'v_structure_proxy': 0.0, |
| 'faithfulness_proxy': 0.0, |
| } |
| |
| rng = np.random.RandomState(42) |
| n_probes = min(n_probes, p * (p - 1) * (p - 2) // 6) |
| |
| pvals_marginal = [] |
| pvals_conditional = [] |
| |
| for _ in range(n_probes): |
| try: |
| idxs = rng.choice(p, size=3, replace=False) |
| i, j, k = idxs |
| |
| |
| ct = pd.crosstab(df.iloc[:, i], df.iloc[:, j]) |
| if ct.shape[0] >= 2 and ct.shape[1] >= 2: |
| _, pval, _, _ = chi2_contingency(ct) |
| pvals_marginal.append(pval) |
| |
| |
| |
| z_vals = df.iloc[:, k].unique() |
| cond_pvals = [] |
| for z_val in z_vals: |
| mask = df.iloc[:, k] == z_val |
| if mask.sum() < 5: |
| continue |
| ct_cond = pd.crosstab(df.iloc[:, i][mask], df.iloc[:, j][mask]) |
| if ct_cond.shape[0] >= 2 and ct_cond.shape[1] >= 2: |
| try: |
| _, pval_c, _, _ = chi2_contingency(ct_cond) |
| cond_pvals.append(pval_c) |
| except Exception: |
| pass |
| |
| if cond_pvals: |
| |
| pvals_conditional.append(np.mean(cond_pvals)) |
| except Exception: |
| continue |
| |
| pvals_marginal = np.array(pvals_marginal) if pvals_marginal else np.array([0.5]) |
| pvals_conditional = np.array(pvals_conditional) if pvals_conditional else np.array([0.5]) |
| |
| frac_dep_m = (pvals_marginal < alpha).mean() |
| frac_dep_c = (pvals_conditional < alpha).mean() |
| |
| return { |
| 'mean_pval_marginal': pvals_marginal.mean(), |
| 'frac_dep_marginal': frac_dep_m, |
| 'mean_pval_conditional': pvals_conditional.mean(), |
| 'frac_dep_conditional': frac_dep_c, |
| 'v_structure_proxy': frac_dep_m - frac_dep_c, |
| 'faithfulness_proxy': abs(frac_dep_m - frac_dep_c), |
| } |
|
|
|
|
| def _distribution_features(df): |
| """Tier 5: Distribution shape features.""" |
| n, p = df.shape |
| |
| mode_freqs = [] |
| balance_scores = [] |
| cardinalities = [] |
| |
| for col in df.columns: |
| vc = df[col].value_counts(normalize=True) |
| mode_freqs.append(vc.iloc[0]) |
| |
| card = len(vc) |
| cardinalities.append(card) |
| |
| |
| if card > 1: |
| h = entropy(vc) |
| max_h = np.log(card) |
| balance_scores.append(h / max_h if max_h > 0 else 0) |
| else: |
| balance_scores.append(0.0) |
| |
| mode_freqs = np.array(mode_freqs) |
| balance_scores = np.array(balance_scores) |
| cardinalities = np.array(cardinalities) |
| |
| return { |
| 'mean_mode_frequency': mode_freqs.mean(), |
| 'std_mode_frequency': mode_freqs.std(), |
| 'mean_balance': balance_scores.mean(), |
| 'uniformity_score': balance_scores.mean(), |
| 'frac_binary_vars': (cardinalities == 2).mean(), |
| 'frac_high_card_vars': (cardinalities > 5).mean(), |
| } |
|
|
|
|
| def _mutual_information(x, y): |
| """Compute mutual information between two discrete arrays.""" |
| |
| from collections import Counter |
| n = len(x) |
| joint = Counter(zip(x, y)) |
| marginal_x = Counter(x) |
| marginal_y = Counter(y) |
| |
| mi = 0.0 |
| for (xi, yi), count in joint.items(): |
| p_xy = count / n |
| p_x = marginal_x[xi] / n |
| p_y = marginal_y[yi] / n |
| if p_xy > 0 and p_x > 0 and p_y > 0: |
| mi += p_xy * np.log(p_xy / (p_x * p_y)) |
| |
| return max(mi, 0.0) |
|
|
|
|
| |
| FEATURE_NAMES = [ |
| |
| 'n_samples', 'n_variables', 'n_over_p', 'avg_cardinality', 'max_cardinality', 'min_cardinality', |
| |
| 'mean_entropy', 'std_entropy', 'max_entropy', 'mean_pairwise_MI', 'std_pairwise_MI', |
| 'max_pairwise_MI', 'mean_normalized_MI', 'frac_high_MI_pairs', |
| |
| 'density_proxy', 'mean_chi2_stat', 'std_chi2_stat', 'max_chi2_stat', |
| 'mean_cramers_v', 'max_cramers_v', 'frac_weak_deps', 'frac_strong_deps', |
| |
| 'mean_pval_marginal', 'frac_dep_marginal', 'mean_pval_conditional', |
| 'frac_dep_conditional', 'v_structure_proxy', 'faithfulness_proxy', |
| |
| 'mean_mode_frequency', 'std_mode_frequency', 'mean_balance', 'uniformity_score', |
| 'frac_binary_vars', 'frac_high_card_vars', |
| ] |
|
|
|
|
| def features_to_vector(features_dict): |
| """Convert feature dict to ordered numpy vector.""" |
| return np.array([features_dict.get(name, 0.0) for name in FEATURE_NAMES]) |
|
|
|
|
| if __name__ == '__main__': |
| logging.basicConfig(level=logging.INFO) |
| |
| from causal_selection.data.generator import load_bn_model, sample_dataset |
| |
| model = load_bn_model('asia') |
| df = sample_dataset(model, 1000, seed=0) |
| |
| print(f"Extracting features from ASIA (N=1000)...") |
| features = extract_all_features(df) |
| |
| for name in FEATURE_NAMES: |
| val = features.get(name, 'MISSING') |
| if isinstance(val, float): |
| print(f" {name:30s}: {val:10.4f}") |
| else: |
| print(f" {name:30s}: {val}") |
| |
| print(f"\nTotal features: {len(FEATURE_NAMES)}") |
|
|