| from typing import Dict, Any |
| from aiflows.base_flows.atomic import AtomicFlow |
| class UpdatePlanAtomicFlow(AtomicFlow): |
| """This flow updates the plan file with the updated plan. The controller should pass the updated plan to this flow. |
| This design (controller reflect on the existing plan--update plan) is intended to let the controller more aware of the |
| plan it has. However one setback is that this process in then not deterministic. |
| |
| *Input Interface* |
| - `updated_plan`: the updated plan in string format |
| |
| *Output Interface* |
| - `result`: the result of the update plan operation |
| |
| *Configuration Parameters*: |
| - `input_interface`: the input interface of the flow, default: `["updated_plan"]` |
| - `output_interface`: the output interface of the flow, default: `["result"]` |
| |
| """ |
| def _check_input(self, input_data: Dict[str, Any]): |
| """Check if the input data is valid. |
| :param input_data: the input data to the flow |
| :type input_data: Dict[str, Any] |
| :raises AssertionError: if the input data is not valid |
| :raises AssertionError: if the input data does not contain the required keys |
| """ |
| assert "memory_files" in input_data, "memory_files not passed to UpdatePlanAtomicFlow.yaml" |
| assert "plan" in input_data["memory_files"], "plan not in memory_files" |
|
|
| def _call(self, input_data: Dict[str, Any]): |
| """The main function of the flow. |
| :param input_data: the input data to the flow |
| :type input_data: Dict[str, Any] |
| :return: the output data of the flow |
| :rtype: Dict[str, Any] |
| """ |
| try: |
| plan_file_location = input_data["memory_files"]["plan"] |
| plan_to_write = input_data["updated_plan"] |
| with open(plan_file_location, 'w') as file: |
| file.write(plan_to_write + "\n") |
| return { |
| "result": "updated plan saved to the plan file and has overriden the previous plan", |
| "summary": f"Coder/UpdatePlanFlow: updated plan saved to {plan_file_location}" |
| } |
| except Exception as e: |
| return { |
| "result": f"Error occurred: {str(e)}", |
| "summary": f"Coder/UpdatePlanFlow: error occurred while writing updated plan: {str(e)}" |
| } |
|
|
| def run( |
| self, |
| input_data: Dict[str, Any] |
| ): |
| """The run function of the flow. |
| :param input_data: the input data to the flow |
| :type input_data: Dict[str, Any] |
| :return: the output data of the flow |
| :rtype: Dict[str, Any] |
| """ |
| self._check_input(input_data) |
| return self._call(input_data) |