| """ |
| Main benchmark runner: orchestrates data generation, algorithm runs, feature extraction, |
| and result collection into a meta-dataset. |
| """ |
| import os |
| import json |
| import time |
| import numpy as np |
| import pandas as pd |
| import logging |
| import warnings |
| from datetime import datetime |
|
|
| from causal_selection.data.generator import ( |
| load_bn_model, get_true_dag_adjmat, dag_to_cpdag, sample_dataset, |
| SMALL_NETWORKS, MEDIUM_NETWORKS, LARGE_NETWORKS, ALL_NETWORKS, |
| SAMPLE_SIZES, SEEDS_PER_CONFIG, get_network_tier |
| ) |
| from causal_selection.discovery.algorithms import run_algorithm, ALGORITHM_POOL |
| from causal_selection.discovery.evaluator import evaluate_algorithm_result |
| from causal_selection.features.extractor import extract_all_features, FEATURE_NAMES |
|
|
| warnings.filterwarnings('ignore') |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| RESULTS_DIR = '/app/causal_selection/data/results' |
| ALGO_NAMES = list(ALGORITHM_POOL.keys()) |
|
|
| |
| TIMEOUT_MAP = { |
| 'small': 60, |
| 'medium': 180, |
| 'large': 300, |
| } |
|
|
|
|
| def run_single_config(network, n_samples, seed, timeout_sec=300): |
| """Run all algorithms on a single (network, n_samples, seed) configuration. |
| |
| Returns: |
| dict with: |
| - 'meta_features': dict of feature values |
| - 'metrics': dict of algo_name -> metrics dict |
| - 'config': dict with network, n_samples, seed |
| """ |
| logger.info(f"=== {network} N={n_samples} seed={seed} ===") |
| |
| |
| model = load_bn_model(network) |
| true_dag, node_names = get_true_dag_adjmat(model) |
| true_cpdag = dag_to_cpdag(true_dag) |
| |
| |
| t0 = time.time() |
| df = sample_dataset(model, n_samples, seed=seed) |
| sample_time = time.time() - t0 |
| logger.info(f" Sampled {df.shape} in {sample_time:.1f}s") |
| |
| |
| t0 = time.time() |
| features = extract_all_features(df, n_probe_triplets=100) |
| feat_time = time.time() - t0 |
| logger.info(f" Extracted {len(features)} features in {feat_time:.1f}s") |
| |
| |
| algo_metrics = {} |
| for algo_name in ALGO_NAMES: |
| t0 = time.time() |
| result = run_algorithm(algo_name, df, timeout_sec=timeout_sec) |
| metrics = evaluate_algorithm_result(result, true_cpdag) |
| algo_metrics[algo_name] = metrics |
| |
| status_str = metrics['status'] |
| if status_str == 'success': |
| logger.info(f" {algo_name:15s}: SHD={metrics['shd']:3d} F1={metrics['skeleton_f1']:.3f} " |
| f"time={metrics['runtime']:.1f}s") |
| else: |
| logger.info(f" {algo_name:15s}: {status_str} time={metrics['runtime']:.1f}s") |
| |
| return { |
| 'meta_features': features, |
| 'metrics': algo_metrics, |
| 'config': { |
| 'network': network, |
| 'n_samples': n_samples, |
| 'seed': seed, |
| 'n_variables': len(node_names), |
| 'n_true_edges': int(((true_cpdag + true_cpdag.T) > 0).sum() // 2), |
| } |
| } |
|
|
|
|
| def build_meta_dataset(networks=None, save_intermediate=True): |
| """Run full benchmark and build meta-dataset. |
| |
| Returns: |
| X: pd.DataFrame of meta-features |
| Y_shd: pd.DataFrame of SHD per algorithm (columns = algo names) |
| Y_nshd: pd.DataFrame of normalized SHD |
| configs: list of config dicts |
| full_results: list of full result dicts |
| """ |
| if networks is None: |
| networks = ALL_NETWORKS |
| |
| all_features = [] |
| all_shd = [] |
| all_nshd = [] |
| all_configs = [] |
| full_results = [] |
| |
| total_configs = 0 |
| for net in networks: |
| tier = get_network_tier(net) |
| n_sizes = len(SAMPLE_SIZES[tier]) |
| total_configs += n_sizes * SEEDS_PER_CONFIG |
| |
| logger.info(f"Starting benchmark: {len(networks)} networks, ~{total_configs} configs") |
| config_idx = 0 |
| |
| for network in networks: |
| tier = get_network_tier(network) |
| sample_sizes = SAMPLE_SIZES[tier] |
| timeout = TIMEOUT_MAP[tier] |
| |
| for n_samples in sample_sizes: |
| for seed in range(SEEDS_PER_CONFIG): |
| config_idx += 1 |
| logger.info(f"\n[{config_idx}/{total_configs}] " |
| f"{network} N={n_samples} seed={seed}") |
| |
| try: |
| result = run_single_config(network, n_samples, seed, |
| timeout_sec=timeout) |
| |
| |
| feat_row = {name: result['meta_features'].get(name, 0.0) |
| for name in FEATURE_NAMES} |
| all_features.append(feat_row) |
| |
| |
| shd_row = {} |
| nshd_row = {} |
| for algo in ALGO_NAMES: |
| m = result['metrics'][algo] |
| shd_row[algo] = m['shd'] |
| nshd_row[algo] = m['normalized_shd'] |
| all_shd.append(shd_row) |
| all_nshd.append(nshd_row) |
| |
| |
| all_configs.append(result['config']) |
| full_results.append(result) |
| |
| except Exception as e: |
| logger.error(f"FAILED config {network} N={n_samples} seed={seed}: {e}") |
| continue |
| |
| |
| if save_intermediate and config_idx % 5 == 0: |
| _save_intermediate(all_features, all_shd, all_nshd, all_configs) |
| |
| |
| X = pd.DataFrame(all_features, columns=FEATURE_NAMES) |
| Y_shd = pd.DataFrame(all_shd, columns=ALGO_NAMES) |
| Y_nshd = pd.DataFrame(all_nshd, columns=ALGO_NAMES) |
| configs_df = pd.DataFrame(all_configs) |
| |
| |
| os.makedirs(RESULTS_DIR, exist_ok=True) |
| X.to_csv(os.path.join(RESULTS_DIR, 'meta_features.csv'), index=False) |
| Y_shd.to_csv(os.path.join(RESULTS_DIR, 'shd_matrix.csv'), index=False) |
| Y_nshd.to_csv(os.path.join(RESULTS_DIR, 'normalized_shd_matrix.csv'), index=False) |
| configs_df.to_csv(os.path.join(RESULTS_DIR, 'configs.csv'), index=False) |
| |
| |
| _save_full_results(full_results) |
| |
| logger.info(f"\n=== BENCHMARK COMPLETE ===") |
| logger.info(f"Total configs: {len(all_features)}") |
| logger.info(f"Meta-feature matrix: {X.shape}") |
| logger.info(f"SHD matrix: {Y_shd.shape}") |
| logger.info(f"Results saved to {RESULTS_DIR}") |
| |
| return X, Y_shd, Y_nshd, configs_df, full_results |
|
|
|
|
| def _save_intermediate(features, shds, nshds, configs): |
| """Save intermediate results.""" |
| os.makedirs(RESULTS_DIR, exist_ok=True) |
| pd.DataFrame(features).to_csv(os.path.join(RESULTS_DIR, 'meta_features_partial.csv'), index=False) |
| pd.DataFrame(shds).to_csv(os.path.join(RESULTS_DIR, 'shd_matrix_partial.csv'), index=False) |
| pd.DataFrame(nshds).to_csv(os.path.join(RESULTS_DIR, 'normalized_shd_partial.csv'), index=False) |
| pd.DataFrame(configs).to_csv(os.path.join(RESULTS_DIR, 'configs_partial.csv'), index=False) |
|
|
|
|
| def _save_full_results(results): |
| """Save full results (without numpy arrays).""" |
| serializable = [] |
| for r in results: |
| entry = { |
| 'config': r['config'], |
| 'meta_features': {k: float(v) if isinstance(v, (np.floating, np.integer)) else v |
| for k, v in r['meta_features'].items()}, |
| 'metrics': {} |
| } |
| for algo, m in r['metrics'].items(): |
| entry['metrics'][algo] = { |
| k: float(v) if isinstance(v, (np.floating, np.integer)) else v |
| for k, v in m.items() |
| } |
| serializable.append(entry) |
| |
| with open(os.path.join(RESULTS_DIR, 'full_results.json'), 'w') as f: |
| json.dump(serializable, f, indent=2) |
|
|
|
|
| if __name__ == '__main__': |
| import sys |
| |
| |
| tier = sys.argv[1] if len(sys.argv) > 1 else 'small' |
| |
| if tier == 'small': |
| networks = SMALL_NETWORKS |
| elif tier == 'medium': |
| networks = MEDIUM_NETWORKS |
| elif tier == 'large': |
| networks = LARGE_NETWORKS |
| elif tier == 'all': |
| networks = ALL_NETWORKS |
| else: |
| networks = [tier] |
| |
| logger.info(f"Running benchmark for tier: {tier} ({networks})") |
| X, Y_shd, Y_nshd, configs, results = build_meta_dataset(networks=networks) |
| |
| |
| print("\n" + "=" * 80) |
| print("BENCHMARK SUMMARY") |
| print("=" * 80) |
| print(f"\nMeta-feature matrix: {X.shape}") |
| print(f"SHD matrix: {Y_shd.shape}") |
| print(f"\nMean SHD per algorithm:") |
| print(Y_shd.mean().sort_values().to_string()) |
| print(f"\nBest algorithm per config:") |
| best = Y_shd.idxmin(axis=1) |
| print(best.value_counts().to_string()) |
|
|