| """ |
| Algorithm adapters: run each causal discovery algorithm with timeout handling. |
| All algorithms take a pandas DataFrame (integer-encoded discrete data) and return |
| an adjacency matrix (np.ndarray) representing the learned graph. |
| """ |
| import numpy as np |
| import pandas as pd |
| import signal |
| import time |
| import traceback |
| import logging |
| from functools import wraps |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class TimeoutError(Exception): |
| pass |
|
|
|
|
| def timeout_handler(signum, frame): |
| raise TimeoutError("Algorithm timed out") |
|
|
|
|
| def run_with_timeout(func, timeout_sec, *args, **kwargs): |
| """Run a function with a timeout (Unix only, uses SIGALRM).""" |
| old_handler = signal.signal(signal.SIGALRM, timeout_handler) |
| signal.alarm(timeout_sec) |
| try: |
| result = func(*args, **kwargs) |
| signal.alarm(0) |
| return result |
| except TimeoutError: |
| logger.warning(f"Timeout after {timeout_sec}s") |
| raise |
| finally: |
| signal.signal(signal.SIGALRM, old_handler) |
| signal.alarm(0) |
|
|
|
|
| def safe_run(algo_name, func, timeout_sec, *args, **kwargs): |
| """Run algorithm with timeout and exception handling. |
| |
| Returns: |
| (adjmat, runtime, status): adjacency matrix, time in seconds, status string |
| """ |
| start = time.time() |
| try: |
| adjmat = run_with_timeout(func, timeout_sec, *args, **kwargs) |
| runtime = time.time() - start |
| return adjmat, runtime, 'success' |
| except TimeoutError: |
| runtime = time.time() - start |
| logger.warning(f"{algo_name}: TIMEOUT after {runtime:.1f}s") |
| return None, runtime, 'timeout' |
| except Exception as e: |
| runtime = time.time() - start |
| logger.error(f"{algo_name}: ERROR after {runtime:.1f}s: {e}") |
| logger.debug(traceback.format_exc()) |
| return None, runtime, f'error: {str(e)[:100]}' |
|
|
|
|
| |
| |
| |
|
|
| def run_pc_discrete(df, alpha=0.01, stable=True): |
| """PC algorithm for discrete data using G-squared test.""" |
| from causallearn.search.ConstraintBased.PC import pc |
| from causallearn.utils.cit import gsq |
| |
| data = df.values.astype(int) |
| cg = pc(data, alpha=alpha, indep_test=gsq, stable=stable, |
| show_progress=False) |
| |
| |
| adj = cg.G.graph |
| return _causallearn_graph_to_adjmat(adj) |
|
|
|
|
| def run_fci(df, alpha=0.05, depth=4): |
| """FCI algorithm - outputs PAG. We extract the directed edges.""" |
| from causallearn.search.ConstraintBased.FCI import fci |
| from causallearn.utils.cit import gsq |
| |
| data = df.values.astype(int) |
| g, edges = fci(data, independence_test_method=gsq, alpha=alpha, |
| depth=depth, show_progress=False) |
| |
| adj = g.graph |
| return _causallearn_pag_to_adjmat(adj) |
|
|
|
|
| |
| |
| |
|
|
| def run_ges_causallearn(df, score_func='local_score_BDeu'): |
| """GES algorithm from causal-learn.""" |
| from causallearn.search.ScoreBased.GES import ges |
| |
| data = df.values.astype(int) |
| record = ges(data, score_func=score_func, maxP=None, |
| parameters=None) |
| |
| adj = record['G'].graph |
| return _causallearn_graph_to_adjmat(adj) |
|
|
|
|
| def run_boss(df, score_func='local_score_BDeu'): |
| """BOSS (Best Order Score Search) algorithm.""" |
| from causallearn.search.PermutationBased.BOSS import boss |
| |
| data = df.values.astype(int) |
| cg = boss(data, score_func=score_func, |
| parameters=None) |
| |
| adj = cg.graph |
| return _causallearn_graph_to_adjmat(adj) |
|
|
|
|
| def run_grasp(df, score_func='local_score_BDeu', depth=3): |
| """GRaSP (Greedy relaxation of Sparsest Permutation) algorithm.""" |
| from causallearn.search.PermutationBased.GRaSP import grasp |
| |
| data = df.values.astype(int) |
| cg = grasp(data, score_func=score_func, depth=depth, |
| parameters=None) |
| |
| adj = cg.graph |
| return _causallearn_graph_to_adjmat(adj) |
|
|
|
|
| |
| |
| |
|
|
| def run_ges_pgmpy(df, scoring_method='bicscore'): |
| """GES algorithm from pgmpy.""" |
| from pgmpy.estimators import ExhaustiveSearch, HillClimbSearch, BicScore, BDeuScore |
| |
| |
| |
| scoring = BicScore(df) if scoring_method == 'bicscore' else BDeuScore(df) |
| hc = HillClimbSearch(df) |
| best_model = hc.estimate(scoring_method=scoring, max_indegree=4, |
| max_iter=100000, epsilon=0.0001) |
| |
| return _pgmpy_model_to_adjmat(best_model, sorted(df.columns)) |
|
|
|
|
| def run_hc(df, scoring_method='bicscore', max_indegree=3, max_iter=100000): |
| """Hill-Climbing algorithm.""" |
| from pgmpy.estimators import HillClimbSearch, BicScore, BDeuScore |
| |
| scoring = BicScore(df) if scoring_method == 'bicscore' else BDeuScore(df) |
| hc = HillClimbSearch(df) |
| best_model = hc.estimate(scoring_method=scoring, max_indegree=max_indegree, |
| max_iter=max_iter, epsilon=0.0001) |
| |
| return _pgmpy_model_to_adjmat(best_model, sorted(df.columns)) |
|
|
|
|
| def run_tabu(df, scoring_method='bicscore', tabu_length=100, max_indegree=3, max_iter=100000): |
| """Tabu Search algorithm.""" |
| from pgmpy.estimators import HillClimbSearch, BicScore, BDeuScore |
| |
| scoring = BicScore(df) if scoring_method == 'bicscore' else BDeuScore(df) |
| hc = HillClimbSearch(df) |
| best_model = hc.estimate(scoring_method=scoring, max_indegree=max_indegree, |
| max_iter=max_iter, epsilon=0.0001, |
| tabu_length=tabu_length) |
| |
| return _pgmpy_model_to_adjmat(best_model, sorted(df.columns)) |
|
|
|
|
| def run_mmhc(df, scoring_method='bdeuscore', significance_level=0.01): |
| """Max-Min Hill-Climbing (MMHC) hybrid algorithm.""" |
| from pgmpy.estimators import MmhcEstimator, BDeuScore, BicScore |
| |
| mmhc = MmhcEstimator(df) |
| scoring = BDeuScore(df) if scoring_method == 'bdeuscore' else BicScore(df) |
| best_model = mmhc.estimate(scoring_method=scoring, |
| significance_level=significance_level) |
| |
| return _pgmpy_model_to_adjmat(best_model, sorted(df.columns)) |
|
|
|
|
| def run_k2(df, max_parents=3): |
| """K2 algorithm (requires node ordering - we use alphabetical as default). |
| We run with multiple random orderings and take the best-scoring result. |
| """ |
| from pgmpy.estimators import HillClimbSearch, K2Score |
| |
| scoring = K2Score(df) |
| hc = HillClimbSearch(df) |
| best_model = hc.estimate(scoring_method=scoring, max_indegree=max_parents, |
| max_iter=100000, epsilon=0.0001) |
| |
| return _pgmpy_model_to_adjmat(best_model, sorted(df.columns)) |
|
|
|
|
| |
| |
| |
|
|
| def _causallearn_graph_to_adjmat(graph_matrix): |
| """Convert causal-learn's graph representation to standard adjacency matrix. |
| |
| causal-learn encoding: |
| graph[i,j] = -1 and graph[j,i] = 1 means i -> j |
| graph[i,j] = -1 and graph[j,i] = -1 means i -- j (undirected) |
| graph[i,j] = 1 and graph[j,i] = 1 means i <-> j (bidirected) |
| |
| Our encoding: |
| adj[i,j] = 1 and adj[j,i] = 0 means i -> j |
| adj[i,j] = 1 and adj[j,i] = 1 means i -- j (undirected) |
| """ |
| n = graph_matrix.shape[0] |
| adj = np.zeros((n, n), dtype=int) |
| |
| for i in range(n): |
| for j in range(n): |
| if i == j: |
| continue |
| if graph_matrix[i, j] == -1 and graph_matrix[j, i] == 1: |
| |
| adj[i, j] = 1 |
| elif graph_matrix[i, j] == -1 and graph_matrix[j, i] == -1: |
| |
| adj[i, j] = 1 |
| adj[j, i] = 1 |
| elif graph_matrix[i, j] == 1 and graph_matrix[j, i] == 1: |
| |
| adj[i, j] = 1 |
| adj[j, i] = 1 |
| |
| return adj |
|
|
|
|
| def _causallearn_pag_to_adjmat(pag_matrix): |
| """Convert PAG (from FCI) to adjacency matrix. |
| |
| PAG encoding in causal-learn: |
| 1 = arrowhead (>), -1 = tail (-), 2 = circle (o) |
| |
| We extract: definite directed edges and definite adjacencies. |
| """ |
| n = pag_matrix.shape[0] |
| adj = np.zeros((n, n), dtype=int) |
| |
| for i in range(n): |
| for j in range(n): |
| if i == j: |
| continue |
| |
| if pag_matrix[i, j] == -1 and pag_matrix[j, i] == 1: |
| adj[i, j] = 1 |
| |
| elif pag_matrix[i, j] != 0 and pag_matrix[j, i] != 0: |
| adj[i, j] = 1 |
| adj[j, i] = 1 |
| |
| return adj |
|
|
|
|
| def _pgmpy_model_to_adjmat(model, node_names): |
| """Convert pgmpy DAGModel to adjacency matrix.""" |
| n = len(node_names) |
| node_idx = {name: i for i, name in enumerate(node_names)} |
| adj = np.zeros((n, n), dtype=int) |
| |
| for parent, child in model.edges(): |
| if parent in node_idx and child in node_idx: |
| adj[node_idx[parent], node_idx[child]] = 1 |
| |
| return adj |
|
|
|
|
| |
| |
| |
|
|
| ALGORITHM_POOL = { |
| 'PC_discrete': { |
| 'func': run_pc_discrete, |
| 'kwargs': {'alpha': 0.01, 'stable': True}, |
| 'library': 'causal_learn', |
| 'output_type': 'cpdag', |
| 'family': 'constraint', |
| }, |
| 'FCI': { |
| 'func': run_fci, |
| 'kwargs': {'alpha': 0.05, 'depth': 4}, |
| 'library': 'causal_learn', |
| 'output_type': 'pag', |
| 'family': 'constraint', |
| }, |
| 'GES': { |
| 'func': run_ges_causallearn, |
| 'kwargs': {'score_func': 'local_score_BDeu'}, |
| 'library': 'causal_learn', |
| 'output_type': 'cpdag', |
| 'family': 'score', |
| }, |
| 'BOSS': { |
| 'func': run_boss, |
| 'kwargs': {'score_func': 'local_score_BDeu'}, |
| 'library': 'causal_learn', |
| 'output_type': 'cpdag', |
| 'family': 'permutation', |
| }, |
| 'GRaSP': { |
| 'func': run_grasp, |
| 'kwargs': {'score_func': 'local_score_BDeu', 'depth': 3}, |
| 'library': 'causal_learn', |
| 'output_type': 'cpdag', |
| 'family': 'permutation', |
| }, |
| 'HC': { |
| 'func': run_hc, |
| 'kwargs': {'scoring_method': 'bicscore', 'max_indegree': 3, 'max_iter': 100000}, |
| 'library': 'pgmpy', |
| 'output_type': 'dag', |
| 'family': 'score', |
| }, |
| 'Tabu': { |
| 'func': run_tabu, |
| 'kwargs': {'scoring_method': 'bicscore', 'tabu_length': 100, 'max_indegree': 3, 'max_iter': 100000}, |
| 'library': 'pgmpy', |
| 'output_type': 'dag', |
| 'family': 'score', |
| }, |
| 'MMHC': { |
| 'func': run_mmhc, |
| 'kwargs': {'scoring_method': 'bdeuscore', 'significance_level': 0.01}, |
| 'library': 'pgmpy', |
| 'output_type': 'dag', |
| 'family': 'hybrid', |
| }, |
| 'K2': { |
| 'func': run_k2, |
| 'kwargs': {'max_parents': 3}, |
| 'library': 'pgmpy', |
| 'output_type': 'dag', |
| 'family': 'score', |
| }, |
| } |
|
|
|
|
| def run_algorithm(algo_name, df, timeout_sec=600): |
| """Run a single algorithm on a dataset. |
| |
| Returns: |
| dict with keys: adjmat, runtime, status, output_type |
| """ |
| if algo_name not in ALGORITHM_POOL: |
| raise ValueError(f"Unknown algorithm: {algo_name}") |
| |
| algo = ALGORITHM_POOL[algo_name] |
| func = algo['func'] |
| kwargs = algo['kwargs'].copy() |
| |
| adjmat, runtime, status = safe_run( |
| algo_name, func, timeout_sec, df, **kwargs |
| ) |
| |
| return { |
| 'adjmat': adjmat, |
| 'runtime': runtime, |
| 'status': status, |
| 'output_type': algo['output_type'], |
| 'family': algo['family'], |
| } |
|
|
|
|
| if __name__ == '__main__': |
| logging.basicConfig(level=logging.INFO) |
| |
| |
| from causal_selection.data.generator import load_bn_model, sample_dataset |
| |
| model = load_bn_model('asia') |
| df = sample_dataset(model, 1000, seed=0) |
| |
| print(f"Testing on ASIA (N=1000)...") |
| for algo_name in ALGORITHM_POOL: |
| result = run_algorithm(algo_name, df, timeout_sec=60) |
| status = result['status'] |
| runtime = result['runtime'] |
| if result['adjmat'] is not None: |
| n_edges = result['adjmat'].sum() |
| print(f" {algo_name:15s}: {status:10s} {runtime:6.2f}s edges={n_edges}") |
| else: |
| print(f" {algo_name:15s}: {status:20s} {runtime:6.2f}s") |
|
|