| import traceback |
| from copy import deepcopy |
| from typing import Dict, Any |
|
|
| from .code_interpreters.create_code_interpreter import create_code_interpreter |
| from aiflows.messages import FlowMessage |
| from aiflows.base_flows import AtomicFlow |
|
|
| def truncate_output(data, max_output_chars=2000): |
| needs_truncation = False |
|
|
| message = f'Output truncated. Showing the last {max_output_chars} characters.\n\n' |
|
|
| |
| if data.startswith(message): |
| data = data[len(message):] |
| needs_truncation = True |
|
|
| |
| if len(data) > max_output_chars or needs_truncation: |
| data = message + data[-max_output_chars:] |
|
|
| return data |
|
|
|
|
| class InterpreterAtomicFlow(AtomicFlow): |
| """This flow is used to run the code passed from the caller. |
| *Input Interface*: |
| - `code` |
| - `language` |
| |
| *Output Interface*: |
| - `interpreter_output`: output of the code interpreter |
| |
| *Configuration Parameters*: |
| - max_output: maximum number of characters to display in the output |
| |
| **Full credits to open-interpreter (https://github.com/KillianLucas/open-interpreter) |
| for the usage of code interpreters (package `code_interpreters`) and the function truncate_output()** |
| |
| I'm extracting the code interpreter part from open-interpreter because the litellm version of open-interpreter |
| is not compatible with that of the current version of aiflows(v.0.1.7). |
| """ |
| def __init__(self, |
| **kwargs): |
| super().__init__(**kwargs) |
| self.max_output = self.flow_config["max_output"] |
| self._code_interpreters = {} |
|
|
| def set_up_flow_state(self): |
| """ class-specific flow state: language and code, which describes the programming language and the code to run. |
| """ |
| super().set_up_flow_state() |
| self.flow_state["language"] = None |
| self.flow_state["code"] = "" |
|
|
| def _state_update_add_language_and_code(self, |
| language: str, |
| code: str) -> None: |
| """ |
| updates the language and code passed from _process_input_data |
| to the flow state |
| :param language: the programming language |
| :param code: the code to run |
| """ |
| self.flow_state["language"] = language |
| self.flow_state["code"] = code |
|
|
| def _check_input(self, input_data: Dict[str, Any]): |
| """ Sanity check of input data |
| :param input_data: input data |
| :type input_data: Dict[str, Any] |
| """ |
| |
| assert "language" in input_data, "attribute 'language' not in input data." |
| assert "code" in input_data, "attribute 'code' not in input data." |
|
|
|
|
| def _process_input_data(self, input_data: Dict[str, Any]): |
| """ Allocate interpreter if any, pass input data into flow state |
| :param input_data: input data |
| :type input_data: Dict[str, Any] |
| """ |
| |
| if input_data["language"] == "python" and input_data["code"].startswith("!"): |
| input_data["language"] = "shell" |
| input_data["code"] = input_data["code"][1:] |
|
|
| |
| |
| |
| language = input_data["language"] |
| if language not in self._code_interpreters: |
| self._code_interpreters[language] = create_code_interpreter(language) |
|
|
| |
| self._state_update_add_language_and_code( |
| language=language, |
| code=input_data["code"] |
| ) |
|
|
| def _call(self): |
| """ This method runs the code interpreter and returns the output. (runs the code interpreter and returns the output.) |
| """ |
|
|
| output = "" |
| try: |
| code_interpreter = self._code_interpreters[self.flow_state["language"]] |
| code = self.flow_state["code"] |
| for line in code_interpreter.run(code): |
| if "output" in line: |
| output += "\n" + line["output"] |
| |
| output = truncate_output(output, self.max_output) |
| output = output.strip() |
| except: |
| output = traceback.format_exc() |
| output = output.strip() |
|
|
| return output |
|
|
| def run( |
| self, |
| input_message: FlowMessage): |
| """ Run the code interpreter and return the output. |
| :param input_message: The input message of the flow. |
| :type input_message: FlowMessage |
| """ |
| input_data = input_message.data |
| self._check_input(input_data) |
| self._process_input_data(input_data) |
| |
| output = self._call() |
| |
| response = { |
| "interpreter_output": output, |
| } |
| |
| reply = self.package_output_message( |
| input_message=input_message, |
| response = response |
| ) |
| |
| self.send_message(reply) |
| |
|
|