| |
| |
|
|
| import os |
| import json |
| import random |
| import tarfile |
| import requests |
| import datetime |
| import numpy as np |
| import pandas as pd |
| from ..utils import make_parent_folder |
| from ...core.logging import logger |
| from ...core.module_utils import load_json, save_json |
|
|
|
|
| AFLOW_DATASET_FILES_MAP = { |
| "hotpotqa": {"train": None, "dev": "hotpotqa_validate.jsonl", "test": "hotpotqa_test.jsonl"}, |
| "humaneval": {"train": None, "dev": "humaneval_validate.jsonl", "test": "humaneval_test.jsonl", "test_cases": "humaneval_public_test.jsonl"}, |
| "mbpp": {"train": None, "dev": "mbpp_validate.jsonl", "test": "mbpp_test.jsonl", "test_cases": "mbpp_public_test.jsonl"}, |
| "gsm8k": {"train": None, "dev": "gsm8k_validate.jsonl", "test": "gsm8k_test.jsonl"}, |
| "math": {"train": None, "dev": "math_validate.jsonl", "test": "math_test.jsonl"}, |
| } |
|
|
| def extract_tar_gz(filename: str, extract_path: str) -> None: |
| """Extract a tar.gz file to the specified path.""" |
| with tarfile.open(filename, "r:gz") as tar: |
| tar.extractall(path=extract_path) |
|
|
|
|
| def download_aflow_benchmark_data(dataset: str, save_folder: str): |
|
|
| candidate_datasets = list(AFLOW_DATASET_FILES_MAP.keys()) + ["all"] |
| lower_candidate_datasets = [dataset.lower() for dataset in candidate_datasets] |
| if dataset.lower() not in lower_candidate_datasets: |
| raise ValueError(f"Invalid value for dataset: {dataset}. Available choices: {candidate_datasets}") |
| |
| url = "https://drive.google.com/uc?export=download&id=1DNoegtZiUhWtvkd2xoIuElmIi4ah7k8e" |
| logger.info(f"Downloading AFlow benchmark data from {url} ...") |
| aflow_data_save_file = os.path.join(save_folder, "aflow_data.tar.gz") |
| |
| make_parent_folder(aflow_data_save_file) |
| response = requests.get(url, stream=True) |
| response.raise_for_status() |
| with open(aflow_data_save_file, "wb") as file: |
| for chunk in response.iter_content(chunk_size=1024): |
| if chunk: |
| file.write(chunk) |
|
|
| logger.info(f"Extracting data for {dataset} dataset(s) from {aflow_data_save_file} ...") |
| extract_tar_gz(aflow_data_save_file, extract_path=save_folder) |
|
|
| if dataset != "all": |
| dataset_files = [file for file in list(AFLOW_DATASET_FILES_MAP[dataset].values()) if file is not None] |
| for file in os.listdir(save_folder): |
| if file not in dataset_files: |
| os.remove(os.path.join(save_folder, file)) |
| |
| if os.path.exists(aflow_data_save_file): |
| logger.info(f"Remove {aflow_data_save_file}") |
| os.remove(aflow_data_save_file) |
|
|
|
|
| class DataUtils: |
|
|
| def __init__(self, root_path: str): |
| self.root_path = root_path |
| self.top_scores = [] |
|
|
| def load_results(self, path: str) -> list: |
| result_path = os.path.join(path, "results.json") |
| if os.path.exists(result_path): |
| with open(result_path, "r") as json_file: |
| try: |
| return json.load(json_file) |
| except json.JSONDecodeError: |
| return [] |
| return [] |
|
|
| def get_top_rounds(self, sample: int, path=None, mode="Graph"): |
|
|
| self._load_scores(path, mode) |
| unique_rounds = set() |
| unique_top_scores = [] |
|
|
| first_round = next((item for item in self.top_scores if item["round"] == 0), None) |
| if first_round: |
| unique_top_scores.append(first_round) |
| unique_rounds.add(0) |
|
|
| for item in self.top_scores: |
| if item["round"] not in unique_rounds: |
| unique_top_scores.append(item) |
| unique_rounds.add(item["round"]) |
|
|
| if len(unique_top_scores) >= sample: |
| break |
|
|
| return unique_top_scores |
|
|
| def select_round(self, items): |
|
|
| if not items: |
| raise ValueError("Item list is empty.") |
|
|
| sorted_items = sorted(items, key=lambda x: x["score"], reverse=True) |
| scores = [item["score"] * 100 for item in sorted_items] |
|
|
| probabilities = self._compute_probabilities(scores) |
| logger.info(f"\nMixed probability distribution: {probabilities}") |
| logger.info(f"\nSorted rounds: {sorted_items}") |
|
|
| selected_index = np.random.choice(len(sorted_items), p=probabilities) |
| logger.info(f"\nSelected index: {selected_index}, Selected item: {sorted_items[selected_index]}") |
|
|
| return sorted_items[selected_index] |
|
|
| def _compute_probabilities(self, scores, alpha=0.2, lambda_=0.3): |
|
|
| scores = np.array(scores, dtype=np.float64) |
| n = len(scores) |
|
|
| if n == 0: |
| raise ValueError("Score list is empty.") |
|
|
| uniform_prob = np.full(n, 1.0 / n, dtype=np.float64) |
|
|
| max_score = np.max(scores) |
| shifted_scores = scores - max_score |
| exp_weights = np.exp(alpha * shifted_scores) |
|
|
| sum_exp_weights = np.sum(exp_weights) |
| if sum_exp_weights == 0: |
| raise ValueError("Sum of exponential weights is 0, cannot normalize.") |
|
|
| score_prob = exp_weights / sum_exp_weights |
|
|
| mixed_prob = lambda_ * uniform_prob + (1 - lambda_) * score_prob |
|
|
| total_prob = np.sum(mixed_prob) |
| if not np.isclose(total_prob, 1.0): |
| mixed_prob = mixed_prob / total_prob |
|
|
| return mixed_prob |
|
|
| def load_log(self, cur_round, path=None, mode: str = "Graph"): |
| if mode == "Graph": |
| log_dir = os.path.join(self.root_path, f"round_{cur_round}", "log.json") |
| else: |
| log_dir = path |
|
|
| if not os.path.exists(log_dir): |
| return "" |
| logger.info(log_dir) |
| |
| data = load_json(log_dir, type="json") |
|
|
| if isinstance(data, dict): |
| data = [data] |
| elif not isinstance(data, list): |
| data = list(data) |
|
|
| if not data: |
| return "" |
|
|
| sample_size = min(3, len(data)) |
| random_samples = random.sample(data, sample_size) |
|
|
| log = "" |
| for sample in random_samples: |
| log += json.dumps(sample, indent=4, ensure_ascii=False) + "\n\n" |
|
|
| return log |
|
|
| def get_results_file_path(self, graph_path: str) -> str: |
| return os.path.join(graph_path, "results.json") |
|
|
| def create_result_data(self, round: int, score: float, avg_cost: float, total_cost: float) -> dict: |
| now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| return {"round": round, "score": score, "avg_cost": avg_cost, "total_cost": total_cost, "time": now} |
|
|
| def save_results(self, json_file_path: str, data: list): |
| save_json(data, json_file_path, type="json", use_indent=True) |
|
|
| def _load_scores(self, path=None, mode="Graph"): |
| if mode == "Graph": |
| rounds_dir = self.root_path |
| else: |
| rounds_dir = path |
|
|
| result_file = os.path.join(rounds_dir, "results.json") |
| self.top_scores = [] |
|
|
| data = load_json(result_file, type="json") |
| df = pd.DataFrame(data) |
|
|
| scores_per_round = df.groupby("round")["score"].mean().to_dict() |
|
|
| for round_number, average_score in scores_per_round.items(): |
| self.top_scores.append({"round": round_number, "score": average_score}) |
|
|
| self.top_scores.sort(key=lambda x: x["score"], reverse=True) |
|
|
| return self.top_scores |
| |
|
|
| def test_case_2_test_function(solution: str, test_case: str, entry_point: str): |
| tester_function = f""" |
| {solution} |
| |
| |
| def check(candidate): |
| {test_case} |
| |
| def test_check(): |
| check({entry_point}) |
| |
| test_check() |
| """ |
| return tester_function |
| |