| import os |
|
|
| import hydra |
|
|
| import aiflows |
| from aiflows.backends.api_info import ApiInfo |
| from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys |
|
|
| from aiflows import logging |
| from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache |
|
|
| from aiflows.utils import serving |
| from aiflows.workers import run_dispatch_worker_thread |
| from aiflows.messages import FlowMessage |
| from aiflows.interfaces import KeyInterface |
| from aiflows.utils.colink_utils import start_colink_server |
| from aiflows import flow_verse |
|
|
|
|
| dependencies = [ |
| { |
| "url": "aiflows/FunSearchFlowModule", |
| "revision": os.path.abspath("../") |
| } |
| ] |
| flow_verse.sync_dependencies(dependencies) |
|
|
| logging.set_verbosity_debug() |
|
|
|
|
| if __name__ == "__main__": |
| |
| cl = start_colink_server() |
| |
| serving.recursive_serve_flow( |
| cl=cl, |
| flow_class_name="flow_modules.aiflows.FunSearchFlowModule.SamplerFlow", |
| flow_endpoint="SamplerFlow", |
| ) |
| |
| run_dispatch_worker_thread(cl) |
| |
| config_overrides = read_yaml_file(os.path.join(".", "demo.yaml")) |
|
|
| api_information = [ApiInfo(backend_used="openai", |
| api_key = os.getenv("OPENAI_API_KEY"))] |
| |
| quick_load_api_keys(config_overrides, api_information, key="api_infos") |
|
|
| funsearch_proxy = serving.get_flow_instance( |
| cl=cl, |
| flow_endpoint="SamplerFlow", |
| config_overrides=config_overrides, |
| ) |
| |
| code = \ |
| """ |
| #function used to evaluate the program: |
| def evaluate(solve_function: str, tests_inputs: List[str], expected_outputs: str) -> float: |
| \"\"\"Returns the score of the solve function we're evolving based on the tests_inputs and expected_outputs. |
| Scores are between 0 and 1, unless the program fails to run, in which case the score is -1. |
| \"\"\" |
| if solve(solve_function, tests_inputs, expected_outputs) == True: |
| return 1.0 |
| return 0.0 |
| |
| |
| def solve_function_v0(input) -> str: |
| \"\"\"Scores per test: test_1:{'score': 1.0, 'feedback': 'No feedback available.'} test_2:{'score': 1.0, 'feedback': 'No feedback available.'} test_3:{'score': 0.0, 'feedback': 'No feedback available.'} test_4:{'score': -1, 'feedback': 'Invalid Format of prediction'}\"\"\" |
| return 'YES' |
| |
| |
| def solve_function_v1(input) -> str: |
| \"\"\"Improved version of solve_function_v0\"\"\" |
| """ |
|
|
| header = \ |
| """ |
| \"\"\"Problem Description: |
| Serval has a string s that only consists of 0 and 1 of length n. The i-th character of s is denoted as s_i, where 1\leq i\leq n. |
| Serval can perform the following operation called Inversion Magic on the string s: |
| Choose an segment [l, r] (1\leq l\leq r\leq n). For l\leq i\leq r, change s_i into 1 if s_i is 0, and change s_i into 0 if s_i is 1. |
| For example, let s be 010100 and the segment [2,5] is chosen. The string s will be 001010 after performing the Inversion Magic. |
| Serval wants to make s a palindrome after performing Inversion Magic exactly once. Help him to determine whether it is possible. |
| A string is a palindrome iff it reads the same backwards as forwards. For example, 010010 is a palindrome but 10111 is not. |
| |
| Input Description: |
| Input |
| Each test contains multiple test cases. The first line contains the number of test cases t (1\leq t\leq 10^4). The description of the test cases follows. |
| The first line of each test case contains a single integer n (2\leq n\leq 10^5) — the length of string s. |
| The second line of each test case contains a binary string s of length n. Only characters 0 and 1 can appear in s. |
| It's guaranteed that the sum of n over all test cases does not exceed 2\cdot 10^5. |
| |
| Output Description: |
| Output |
| For each test case, print Yes if s can be a palindrome after performing Inversion Magic exactly once, and print No if not. |
| You can output Yes and No in any case (for example, strings yEs, yes, Yes and YES will be recognized as a positive response). |
| |
| Public Tests: |
| Test 1: |
| Input: ['1', '4', '1001'] |
| Output: 'YES' |
| Test 2: |
| Input: ['1', '5', '10010'] |
| Output: 'YES' |
| Test 3: |
| Input: ['1', '7', '0111011'] |
| Output: 'NO' |
| |
| \"\"\" |
| |
| |
| import ast |
| import itertools |
| import numpy as np |
| from typing import List |
| |
| def solve(solve_function: str,input: List[str], expected_output: str) -> str: |
| \"\"\"function used to run the solve function on input *kwargs and return the the predicted output |
| |
| :param solve_function: the function to run (the solve function below as a string) |
| :type solve_function: str |
| :param kwargs: the inputs to the solve function |
| :type kwargs: List[str] |
| \"\"\" |
| local_namespace = {} |
| exec(solve_function,local_namespace) |
| found_name, program_name = get_function_name_from_code(solve_function) |
| |
| if not found_name: |
| raise ValueError(f"Function name not found in program: {solve_function}") |
| |
| solve_fn = local_namespace.get(program_name) |
| |
| prediction = solve_fn(input) |
| |
| prediction = prediction.split() |
| expected_output = expected_output.split() |
| |
| if len(prediction) != len(expected_output): |
| raise ValueError(f"Invalid Format of prediction") |
| |
| for i in range(len(prediction)): |
| if prediction[i] != expected_output[i]: |
| return False |
| |
| return True |
| |
| def evaluate(solve_function: str, tests_inputs: List[str], expected_outputs: str) -> float: |
| \"\"\"Returns the score of the solve function we're evolving based on the tests_inputs and expected_outputs. |
| Scores are between 0 and 1, unless the program fails to run, in which case the score is -1. |
| \"\"\" |
| if solve(solve_function,tests_inputs,expected_outputs) == True: |
| return 1.0 |
| return 0.0 |
| |
| |
| def get_function_name_from_code(code): |
| tree = ast.parse(code) |
| for node in ast.walk(tree): |
| if isinstance(node, ast.FunctionDef): |
| return True, node.name |
| |
| # something is wrong |
| return False, None |
| |
| """ |
| |
| data = { |
| 'code': code, |
| 'header': header |
| } |
| |
| input_message = funsearch_proxy.package_input_message(data = data) |
| |
| funsearch_proxy.send_message(input_message) |
| |
| future = funsearch_proxy.get_reply_future(input_message) |
| response = future.get_data() |
| print("~~~Response~~~") |
| print(response) |
|
|