Oguzz07's picture
Add causal_selection/benchmark.py
a47b09f verified
"""
Main benchmark runner: orchestrates data generation, algorithm runs, feature extraction,
and result collection into a meta-dataset.
"""
import os
import json
import time
import numpy as np
import pandas as pd
import logging
import warnings
from datetime import datetime
from causal_selection.data.generator import (
load_bn_model, get_true_dag_adjmat, dag_to_cpdag, sample_dataset,
SMALL_NETWORKS, MEDIUM_NETWORKS, LARGE_NETWORKS, ALL_NETWORKS,
SAMPLE_SIZES, SEEDS_PER_CONFIG, get_network_tier
)
from causal_selection.discovery.algorithms import run_algorithm, ALGORITHM_POOL
from causal_selection.discovery.evaluator import evaluate_algorithm_result
from causal_selection.features.extractor import extract_all_features, FEATURE_NAMES
warnings.filterwarnings('ignore')
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
RESULTS_DIR = '/app/causal_selection/data/results'
ALGO_NAMES = list(ALGORITHM_POOL.keys())
# Timeout per algorithm per dataset (seconds)
TIMEOUT_MAP = {
'small': 60, # 1 min for small networks
'medium': 180, # 3 min for medium networks
'large': 300, # 5 min for large networks
}
def run_single_config(network, n_samples, seed, timeout_sec=300):
"""Run all algorithms on a single (network, n_samples, seed) configuration.
Returns:
dict with:
- 'meta_features': dict of feature values
- 'metrics': dict of algo_name -> metrics dict
- 'config': dict with network, n_samples, seed
"""
logger.info(f"=== {network} N={n_samples} seed={seed} ===")
# Load network and ground truth
model = load_bn_model(network)
true_dag, node_names = get_true_dag_adjmat(model)
true_cpdag = dag_to_cpdag(true_dag)
# Sample data
t0 = time.time()
df = sample_dataset(model, n_samples, seed=seed)
sample_time = time.time() - t0
logger.info(f" Sampled {df.shape} in {sample_time:.1f}s")
# Extract meta-features
t0 = time.time()
features = extract_all_features(df, n_probe_triplets=100)
feat_time = time.time() - t0
logger.info(f" Extracted {len(features)} features in {feat_time:.1f}s")
# Run all algorithms
algo_metrics = {}
for algo_name in ALGO_NAMES:
t0 = time.time()
result = run_algorithm(algo_name, df, timeout_sec=timeout_sec)
metrics = evaluate_algorithm_result(result, true_cpdag)
algo_metrics[algo_name] = metrics
status_str = metrics['status']
if status_str == 'success':
logger.info(f" {algo_name:15s}: SHD={metrics['shd']:3d} F1={metrics['skeleton_f1']:.3f} "
f"time={metrics['runtime']:.1f}s")
else:
logger.info(f" {algo_name:15s}: {status_str} time={metrics['runtime']:.1f}s")
return {
'meta_features': features,
'metrics': algo_metrics,
'config': {
'network': network,
'n_samples': n_samples,
'seed': seed,
'n_variables': len(node_names),
'n_true_edges': int(((true_cpdag + true_cpdag.T) > 0).sum() // 2),
}
}
def build_meta_dataset(networks=None, save_intermediate=True):
"""Run full benchmark and build meta-dataset.
Returns:
X: pd.DataFrame of meta-features
Y_shd: pd.DataFrame of SHD per algorithm (columns = algo names)
Y_nshd: pd.DataFrame of normalized SHD
configs: list of config dicts
full_results: list of full result dicts
"""
if networks is None:
networks = ALL_NETWORKS
all_features = []
all_shd = []
all_nshd = []
all_configs = []
full_results = []
total_configs = 0
for net in networks:
tier = get_network_tier(net)
n_sizes = len(SAMPLE_SIZES[tier])
total_configs += n_sizes * SEEDS_PER_CONFIG
logger.info(f"Starting benchmark: {len(networks)} networks, ~{total_configs} configs")
config_idx = 0
for network in networks:
tier = get_network_tier(network)
sample_sizes = SAMPLE_SIZES[tier]
timeout = TIMEOUT_MAP[tier]
for n_samples in sample_sizes:
for seed in range(SEEDS_PER_CONFIG):
config_idx += 1
logger.info(f"\n[{config_idx}/{total_configs}] "
f"{network} N={n_samples} seed={seed}")
try:
result = run_single_config(network, n_samples, seed,
timeout_sec=timeout)
# Extract feature vector
feat_row = {name: result['meta_features'].get(name, 0.0)
for name in FEATURE_NAMES}
all_features.append(feat_row)
# Extract SHD vector
shd_row = {}
nshd_row = {}
for algo in ALGO_NAMES:
m = result['metrics'][algo]
shd_row[algo] = m['shd']
nshd_row[algo] = m['normalized_shd']
all_shd.append(shd_row)
all_nshd.append(nshd_row)
# Config info
all_configs.append(result['config'])
full_results.append(result)
except Exception as e:
logger.error(f"FAILED config {network} N={n_samples} seed={seed}: {e}")
continue
# Save intermediate results periodically
if save_intermediate and config_idx % 5 == 0:
_save_intermediate(all_features, all_shd, all_nshd, all_configs)
# Build final DataFrames
X = pd.DataFrame(all_features, columns=FEATURE_NAMES)
Y_shd = pd.DataFrame(all_shd, columns=ALGO_NAMES)
Y_nshd = pd.DataFrame(all_nshd, columns=ALGO_NAMES)
configs_df = pd.DataFrame(all_configs)
# Save final results
os.makedirs(RESULTS_DIR, exist_ok=True)
X.to_csv(os.path.join(RESULTS_DIR, 'meta_features.csv'), index=False)
Y_shd.to_csv(os.path.join(RESULTS_DIR, 'shd_matrix.csv'), index=False)
Y_nshd.to_csv(os.path.join(RESULTS_DIR, 'normalized_shd_matrix.csv'), index=False)
configs_df.to_csv(os.path.join(RESULTS_DIR, 'configs.csv'), index=False)
# Save full results as JSON
_save_full_results(full_results)
logger.info(f"\n=== BENCHMARK COMPLETE ===")
logger.info(f"Total configs: {len(all_features)}")
logger.info(f"Meta-feature matrix: {X.shape}")
logger.info(f"SHD matrix: {Y_shd.shape}")
logger.info(f"Results saved to {RESULTS_DIR}")
return X, Y_shd, Y_nshd, configs_df, full_results
def _save_intermediate(features, shds, nshds, configs):
"""Save intermediate results."""
os.makedirs(RESULTS_DIR, exist_ok=True)
pd.DataFrame(features).to_csv(os.path.join(RESULTS_DIR, 'meta_features_partial.csv'), index=False)
pd.DataFrame(shds).to_csv(os.path.join(RESULTS_DIR, 'shd_matrix_partial.csv'), index=False)
pd.DataFrame(nshds).to_csv(os.path.join(RESULTS_DIR, 'normalized_shd_partial.csv'), index=False)
pd.DataFrame(configs).to_csv(os.path.join(RESULTS_DIR, 'configs_partial.csv'), index=False)
def _save_full_results(results):
"""Save full results (without numpy arrays)."""
serializable = []
for r in results:
entry = {
'config': r['config'],
'meta_features': {k: float(v) if isinstance(v, (np.floating, np.integer)) else v
for k, v in r['meta_features'].items()},
'metrics': {}
}
for algo, m in r['metrics'].items():
entry['metrics'][algo] = {
k: float(v) if isinstance(v, (np.floating, np.integer)) else v
for k, v in m.items()
}
serializable.append(entry)
with open(os.path.join(RESULTS_DIR, 'full_results.json'), 'w') as f:
json.dump(serializable, f, indent=2)
if __name__ == '__main__':
import sys
# Allow selecting network tier from command line
tier = sys.argv[1] if len(sys.argv) > 1 else 'small'
if tier == 'small':
networks = SMALL_NETWORKS
elif tier == 'medium':
networks = MEDIUM_NETWORKS
elif tier == 'large':
networks = LARGE_NETWORKS
elif tier == 'all':
networks = ALL_NETWORKS
else:
networks = [tier] # single network name
logger.info(f"Running benchmark for tier: {tier} ({networks})")
X, Y_shd, Y_nshd, configs, results = build_meta_dataset(networks=networks)
# Print summary
print("\n" + "=" * 80)
print("BENCHMARK SUMMARY")
print("=" * 80)
print(f"\nMeta-feature matrix: {X.shape}")
print(f"SHD matrix: {Y_shd.shape}")
print(f"\nMean SHD per algorithm:")
print(Y_shd.mean().sort_values().to_string())
print(f"\nBest algorithm per config:")
best = Y_shd.idxmin(axis=1)
print(best.value_counts().to_string())