""" Meta-feature extraction module for discrete observational datasets. Extracts ~34 features across 5 categories: - Tier 1: Basic descriptors (6 features) - Tier 2: Information-theoretic (8 features) - Tier 3: Dependency structure (8 features) - Tier 4: CI test landmark probes (6 features) - Tier 5: Distribution shape (6 features) """ import numpy as np import pandas as pd from scipy.stats import entropy, chi2_contingency from itertools import combinations import warnings import logging warnings.filterwarnings('ignore') logger = logging.getLogger(__name__) def extract_all_features(df, n_probe_triplets=100, alpha=0.05): """Extract all meta-features from a discrete dataset. Args: df: pd.DataFrame with integer-encoded discrete columns n_probe_triplets: number of random (X,Y,Z) triplets for CI probes alpha: significance level for dependency tests Returns: dict of feature_name -> float """ features = {} # Tier 1: Basic descriptors features.update(_basic_features(df)) # Tier 2: Information-theoretic features.update(_info_theory_features(df)) # Tier 3: Dependency structure features.update(_dependency_features(df, alpha=alpha)) # Tier 4: CI test landmark probes features.update(_ci_probe_features(df, n_probes=n_probe_triplets, alpha=alpha)) # Tier 5: Distribution shape features.update(_distribution_features(df)) return features def _basic_features(df): """Tier 1: Basic dataset descriptors.""" n_samples, n_vars = df.shape cardinalities = df.nunique().values return { 'n_samples': n_samples, 'n_variables': n_vars, 'n_over_p': n_samples / max(n_vars, 1), 'avg_cardinality': cardinalities.mean(), 'max_cardinality': cardinalities.max(), 'min_cardinality': cardinalities.min(), } def _info_theory_features(df): """Tier 2: Information-theoretic features.""" n, p = df.shape # Per-variable entropy entropies = [] for col in df.columns: vc = df[col].value_counts(normalize=True) entropies.append(entropy(vc)) entropies = np.array(entropies) # Pairwise mutual information (subsample if too many pairs) cols = list(range(p)) all_pairs = list(combinations(cols, 2)) # Limit pairs for large datasets max_pairs = min(len(all_pairs), 500) if len(all_pairs) > max_pairs: rng = np.random.RandomState(42) pair_indices = rng.choice(len(all_pairs), max_pairs, replace=False) pairs = [all_pairs[i] for i in pair_indices] else: pairs = all_pairs mis = [] norm_mis = [] for i, j in pairs: mi = _mutual_information(df.iloc[:, i].values, df.iloc[:, j].values) mis.append(mi) # Normalized MI denom = np.sqrt(entropies[i] * entropies[j]) if denom > 1e-10: norm_mis.append(mi / denom) else: norm_mis.append(0.0) mis = np.array(mis) norm_mis = np.array(norm_mis) return { 'mean_entropy': entropies.mean(), 'std_entropy': entropies.std(), 'max_entropy': entropies.max(), 'mean_pairwise_MI': mis.mean(), 'std_pairwise_MI': mis.std(), 'max_pairwise_MI': mis.max(), 'mean_normalized_MI': norm_mis.mean(), 'frac_high_MI_pairs': (mis > 0.05).mean(), # threshold for "meaningful" MI } def _dependency_features(df, alpha=0.05): """Tier 3: Dependency structure features via chi-squared tests.""" n, p = df.shape cols = list(range(p)) all_pairs = list(combinations(cols, 2)) # Limit pairs max_pairs = min(len(all_pairs), 500) if len(all_pairs) > max_pairs: rng = np.random.RandomState(42) pair_indices = rng.choice(len(all_pairs), max_pairs, replace=False) pairs = [all_pairs[i] for i in pair_indices] else: pairs = all_pairs chi2_stats = [] pvals = [] cramers_vs = [] for i, j in pairs: try: ct = pd.crosstab(df.iloc[:, i], df.iloc[:, j]) if ct.shape[0] < 2 or ct.shape[1] < 2: continue chi2, pval, dof, expected = chi2_contingency(ct) chi2_stats.append(chi2) pvals.append(pval) # Cramér's V min_dim = min(ct.shape[0], ct.shape[1]) - 1 if min_dim > 0 and n > 0: v = np.sqrt(chi2 / (n * min_dim)) cramers_vs.append(v) except Exception: continue chi2_stats = np.array(chi2_stats) if chi2_stats else np.array([0.0]) pvals = np.array(pvals) if pvals else np.array([1.0]) cramers_vs = np.array(cramers_vs) if cramers_vs else np.array([0.0]) return { 'density_proxy': (pvals < alpha).mean(), 'mean_chi2_stat': chi2_stats.mean(), 'std_chi2_stat': chi2_stats.std(), 'max_chi2_stat': chi2_stats.max(), 'mean_cramers_v': cramers_vs.mean(), 'max_cramers_v': cramers_vs.max(), 'frac_weak_deps': (cramers_vs < 0.1).mean(), 'frac_strong_deps': (cramers_vs > 0.3).mean(), } def _ci_probe_features(df, n_probes=100, alpha=0.05): """Tier 4: Conditional independence test landmark probes. Sample random (X, Y, Z) triplets: - Test X ⊥ Y (marginal) - Test X ⊥ Y | Z (conditional) Summarize test statistics. """ n, p = df.shape if p < 3: return { 'mean_pval_marginal': 0.5, 'frac_dep_marginal': 0.5, 'mean_pval_conditional': 0.5, 'frac_dep_conditional': 0.5, 'v_structure_proxy': 0.0, 'faithfulness_proxy': 0.0, } rng = np.random.RandomState(42) n_probes = min(n_probes, p * (p - 1) * (p - 2) // 6) # cap at actual triplets pvals_marginal = [] pvals_conditional = [] for _ in range(n_probes): try: idxs = rng.choice(p, size=3, replace=False) i, j, k = idxs # Marginal test: X ⊥ Y ct = pd.crosstab(df.iloc[:, i], df.iloc[:, j]) if ct.shape[0] >= 2 and ct.shape[1] >= 2: _, pval, _, _ = chi2_contingency(ct) pvals_marginal.append(pval) # Conditional test: X ⊥ Y | Z # Stratify by Z values z_vals = df.iloc[:, k].unique() cond_pvals = [] for z_val in z_vals: mask = df.iloc[:, k] == z_val if mask.sum() < 5: continue ct_cond = pd.crosstab(df.iloc[:, i][mask], df.iloc[:, j][mask]) if ct_cond.shape[0] >= 2 and ct_cond.shape[1] >= 2: try: _, pval_c, _, _ = chi2_contingency(ct_cond) cond_pvals.append(pval_c) except Exception: pass if cond_pvals: # Use Fisher's method or mean p-value pvals_conditional.append(np.mean(cond_pvals)) except Exception: continue pvals_marginal = np.array(pvals_marginal) if pvals_marginal else np.array([0.5]) pvals_conditional = np.array(pvals_conditional) if pvals_conditional else np.array([0.5]) frac_dep_m = (pvals_marginal < alpha).mean() frac_dep_c = (pvals_conditional < alpha).mean() return { 'mean_pval_marginal': pvals_marginal.mean(), 'frac_dep_marginal': frac_dep_m, 'mean_pval_conditional': pvals_conditional.mean(), 'frac_dep_conditional': frac_dep_c, 'v_structure_proxy': frac_dep_m - frac_dep_c, # v-structures weaken conditional deps 'faithfulness_proxy': abs(frac_dep_m - frac_dep_c), # divergence between marginal/conditional } def _distribution_features(df): """Tier 5: Distribution shape features.""" n, p = df.shape mode_freqs = [] balance_scores = [] cardinalities = [] for col in df.columns: vc = df[col].value_counts(normalize=True) mode_freqs.append(vc.iloc[0]) # frequency of most common value card = len(vc) cardinalities.append(card) # Balance: entropy / log(cardinality) — 1.0 = perfectly uniform if card > 1: h = entropy(vc) max_h = np.log(card) balance_scores.append(h / max_h if max_h > 0 else 0) else: balance_scores.append(0.0) mode_freqs = np.array(mode_freqs) balance_scores = np.array(balance_scores) cardinalities = np.array(cardinalities) return { 'mean_mode_frequency': mode_freqs.mean(), 'std_mode_frequency': mode_freqs.std(), 'mean_balance': balance_scores.mean(), 'uniformity_score': balance_scores.mean(), # alias 'frac_binary_vars': (cardinalities == 2).mean(), 'frac_high_card_vars': (cardinalities > 5).mean(), } def _mutual_information(x, y): """Compute mutual information between two discrete arrays.""" # Joint distribution from collections import Counter n = len(x) joint = Counter(zip(x, y)) marginal_x = Counter(x) marginal_y = Counter(y) mi = 0.0 for (xi, yi), count in joint.items(): p_xy = count / n p_x = marginal_x[xi] / n p_y = marginal_y[yi] / n if p_xy > 0 and p_x > 0 and p_y > 0: mi += p_xy * np.log(p_xy / (p_x * p_y)) return max(mi, 0.0) # Feature names for consistent ordering FEATURE_NAMES = [ # Tier 1: Basic 'n_samples', 'n_variables', 'n_over_p', 'avg_cardinality', 'max_cardinality', 'min_cardinality', # Tier 2: Info-theoretic 'mean_entropy', 'std_entropy', 'max_entropy', 'mean_pairwise_MI', 'std_pairwise_MI', 'max_pairwise_MI', 'mean_normalized_MI', 'frac_high_MI_pairs', # Tier 3: Dependency 'density_proxy', 'mean_chi2_stat', 'std_chi2_stat', 'max_chi2_stat', 'mean_cramers_v', 'max_cramers_v', 'frac_weak_deps', 'frac_strong_deps', # Tier 4: CI probes 'mean_pval_marginal', 'frac_dep_marginal', 'mean_pval_conditional', 'frac_dep_conditional', 'v_structure_proxy', 'faithfulness_proxy', # Tier 5: Distribution 'mean_mode_frequency', 'std_mode_frequency', 'mean_balance', 'uniformity_score', 'frac_binary_vars', 'frac_high_card_vars', ] def features_to_vector(features_dict): """Convert feature dict to ordered numpy vector.""" return np.array([features_dict.get(name, 0.0) for name in FEATURE_NAMES]) if __name__ == '__main__': logging.basicConfig(level=logging.INFO) from causal_selection.data.generator import load_bn_model, sample_dataset model = load_bn_model('asia') df = sample_dataset(model, 1000, seed=0) print(f"Extracting features from ASIA (N=1000)...") features = extract_all_features(df) for name in FEATURE_NAMES: val = features.get(name, 'MISSING') if isinstance(val, float): print(f" {name:30s}: {val:10.4f}") else: print(f" {name:30s}: {val}") print(f"\nTotal features: {len(FEATURE_NAMES)}")