""" Resume benchmark from partial results, then run medium and large networks too. Optimized for CPU: reduced timeouts, skip heavy combos. """ import os import sys import time import numpy as np import pandas as pd import json import logging import warnings warnings.filterwarnings('ignore') logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') logger = logging.getLogger(__name__) # Suppress the verbose BOSS/GRaSP output logging.getLogger('causallearn').setLevel(logging.WARNING) sys.path.insert(0, '/app') from causal_selection.data.generator import ( load_bn_model, get_true_dag_adjmat, dag_to_cpdag, sample_dataset, SMALL_NETWORKS, MEDIUM_NETWORKS, LARGE_NETWORKS, get_network_tier ) from causal_selection.discovery.algorithms import run_algorithm, ALGORITHM_POOL from causal_selection.discovery.evaluator import evaluate_algorithm_result from causal_selection.features.extractor import extract_all_features, FEATURE_NAMES ALGO_NAMES = list(ALGORITHM_POOL.keys()) RESULTS_DIR = '/app/causal_selection/data/results' # More aggressive sample sizes - fewer but covering range SAMPLE_SIZES_FAST = { 'small': [500, 1000, 2000, 5000], 'medium': [500, 1000, 2000], 'large': [500, 1000], } # Per-algorithm timeout (seconds) - algorithm-specific! ALGO_TIMEOUT = { 'PC_discrete': {'small': 30, 'medium': 120, 'large': 300}, 'FCI': {'small': 30, 'medium': 120, 'large': 300}, 'GES': {'small': 30, 'medium': 120, 'large': 300}, 'BOSS': {'small': 30, 'medium': 120, 'large': 300}, 'GRaSP': {'small': 30, 'medium': 120, 'large': 300}, 'HC': {'small': 30, 'medium': 60, 'large': 120}, 'Tabu': {'small': 30, 'medium': 60, 'large': 120}, 'MMHC': {'small': 30, 'medium': 60, 'large': 120}, 'K2': {'small': 20, 'medium': 30, 'large': 60}, } SEEDS = 2 # Reduced from 3 to speed up def load_existing_results(): """Load existing partial results to avoid re-running.""" existing = set() partial_path = os.path.join(RESULTS_DIR, 'configs_partial.csv') final_path = os.path.join(RESULTS_DIR, 'configs.csv') for path in [partial_path, final_path]: if os.path.exists(path): df = pd.read_csv(path) for _, row in df.iterrows(): key = (row['network'], int(row['n_samples']), int(row['seed'])) existing.add(key) return existing def run_benchmark(): """Run full benchmark with resume capability.""" existing = load_existing_results() logger.info(f"Found {len(existing)} existing configs") # Load existing partial data all_features = [] all_shd = [] all_nshd = [] all_configs = [] for prefix in ['meta_features_partial', 'meta_features']: path = os.path.join(RESULTS_DIR, f'{prefix}.csv') if os.path.exists(path): df = pd.read_csv(path) all_features = df.to_dict('records') break for prefix in ['shd_matrix_partial', 'shd_matrix']: path = os.path.join(RESULTS_DIR, f'{prefix}.csv') if os.path.exists(path): df = pd.read_csv(path) all_shd = df.to_dict('records') break for prefix in ['normalized_shd_partial', 'normalized_shd_matrix']: path = os.path.join(RESULTS_DIR, f'{prefix}.csv') if os.path.exists(path): df = pd.read_csv(path) all_nshd = df.to_dict('records') break for prefix in ['configs_partial', 'configs']: path = os.path.join(RESULTS_DIR, f'{prefix}.csv') if os.path.exists(path): df = pd.read_csv(path) all_configs = df.to_dict('records') break logger.info(f"Starting with {len(all_configs)} existing results") # Generate all configs to run all_networks = SMALL_NETWORKS + MEDIUM_NETWORKS + LARGE_NETWORKS configs_to_run = [] for net in all_networks: tier = get_network_tier(net) for n_samples in SAMPLE_SIZES_FAST[tier]: for seed in range(SEEDS): key = (net, n_samples, seed) if key not in existing: configs_to_run.append((net, n_samples, seed, tier)) logger.info(f"Configs to run: {len(configs_to_run)}") total = len(configs_to_run) for idx, (network, n_samples, seed, tier) in enumerate(configs_to_run): logger.info(f"\n[{idx+1}/{total}] {network} N={n_samples} seed={seed}") try: # Load network model = load_bn_model(network) true_dag, node_names = get_true_dag_adjmat(model) true_cpdag = dag_to_cpdag(true_dag) # Sample data df = sample_dataset(model, n_samples, seed=seed) # Extract features features = extract_all_features(df, n_probe_triplets=80) # Run algorithms with per-algo timeout algo_metrics = {} for algo_name in ALGO_NAMES: timeout = ALGO_TIMEOUT[algo_name][tier] result = run_algorithm(algo_name, df, timeout_sec=timeout) metrics = evaluate_algorithm_result(result, true_cpdag) algo_metrics[algo_name] = metrics s = metrics['status'] if s == 'success': logger.info(f" {algo_name:12s}: SHD={metrics['shd']:3d} F1={metrics['skeleton_f1']:.3f} t={metrics['runtime']:.1f}s") else: logger.info(f" {algo_name:12s}: {s} t={metrics['runtime']:.1f}s") # Store results feat_row = {name: features.get(name, 0.0) for name in FEATURE_NAMES} all_features.append(feat_row) shd_row = {algo: algo_metrics[algo]['shd'] for algo in ALGO_NAMES} nshd_row = {algo: algo_metrics[algo]['normalized_shd'] for algo in ALGO_NAMES} all_shd.append(shd_row) all_nshd.append(nshd_row) config = { 'network': network, 'n_samples': n_samples, 'seed': seed, 'n_variables': len(node_names), 'n_true_edges': int(((true_cpdag + true_cpdag.T) > 0).sum() // 2), } all_configs.append(config) # Save periodically if (idx + 1) % 3 == 0: _save_results(all_features, all_shd, all_nshd, all_configs, partial=True) except Exception as e: logger.error(f"FAILED {network} N={n_samples} seed={seed}: {e}") import traceback traceback.print_exc() # Final save _save_results(all_features, all_shd, all_nshd, all_configs, partial=False) # Print summary Y_shd = pd.DataFrame(all_shd) configs_df = pd.DataFrame(all_configs) print("\n" + "=" * 80) print("BENCHMARK COMPLETE") print("=" * 80) print(f"Total configs: {len(all_configs)}") print(f"Networks: {configs_df['network'].unique()}") print(f"\nMean SHD per algorithm:") print(Y_shd.mean().sort_values()) print(f"\nBest algorithm count:") print(Y_shd.idxmin(axis=1).value_counts()) def _save_results(features, shds, nshds, configs, partial=True): os.makedirs(RESULTS_DIR, exist_ok=True) suffix = '_partial' if partial else '' pd.DataFrame(features).to_csv(os.path.join(RESULTS_DIR, f'meta_features{suffix}.csv'), index=False) pd.DataFrame(shds).to_csv(os.path.join(RESULTS_DIR, f'shd_matrix{suffix}.csv'), index=False) pd.DataFrame(nshds).to_csv(os.path.join(RESULTS_DIR, f'normalized_shd_matrix{suffix}.csv'), index=False) pd.DataFrame(configs).to_csv(os.path.join(RESULTS_DIR, f'configs{suffix}.csv'), index=False) if __name__ == '__main__': run_benchmark()