From fa530ea2e8451ca23879aeb5421f7a15c7d6c9b0 Mon Sep 17 00:00:00 2001 From: Lorenze Jay Date: Wed, 10 Jul 2024 22:51:40 -0700 Subject: [PATCH] replay working for both seq and hier just need tests --- src/crewai/crew.py | 333 ++++++++++++++------------- src/crewai/utilities/file_handler.py | 16 +- tests/crew_test.py | 11 +- 3 files changed, 189 insertions(+), 171 deletions(-) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index e56d72d06..64f77a0ff 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -33,11 +33,11 @@ from crewai.telemetry import Telemetry from crewai.tools.agent_tools import AgentTools from crewai.utilities import I18N, FileHandler, Logger, RPMController from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE -from crewai.utilities.crew_json_encoder import CrewJSONEncoder from crewai.utilities.evaluators.task_evaluator import TaskEvaluator from crewai.utilities.file_handler import TaskOutputJsonHandler from crewai.utilities.formatter import ( aggregate_raw_outputs_from_task_outputs, + aggregate_raw_outputs_from_tasks, ) from crewai.utilities.training_handler import CrewTrainingHandler @@ -392,11 +392,10 @@ class Crew(BaseModel): ) -> CrewOutput: """Starts the crew to work on its assigned tasks.""" self._execution_span = self._telemetry.crew_execution_span(self, inputs) - self.execution_logs = [] self._task_output_handler.reset() if inputs is not None: self._inputs = inputs - # self._interpolate_inputs(inputs) + self._interpolate_inputs(inputs) self._set_tasks_callbacks() i18n = I18N(prompt_file=self.prompt_file) @@ -422,7 +421,7 @@ class Crew(BaseModel): if self.process == Process.sequential: result = self._run_sequential_process() elif self.process == Process.hierarchical: - result = self._run_hierarchical_process() # type: ignore # Incompatible types in assignment (expression has type "str | dict[str, Any]", variable has type "str") + result = self._run_hierarchical_process() else: raise NotImplementedError( f"The process '{self.process}' is not implemented yet." @@ -510,7 +509,13 @@ class Crew(BaseModel): return results - def _store_execution_log(self, task: Task, output, task_index): + def _store_execution_log( + self, + task: Task, + output: TaskOutput, + task_index: int, + was_replayed: bool = False, + ): if self._inputs: inputs = self._inputs else: @@ -521,21 +526,28 @@ class Crew(BaseModel): "expected_output": task.expected_output, "agent_role": task.agent.role if task.agent else "None", "output": { - "description": task.description, - "summary": task.description, - "raw_output": output.raw_output, - "pydantic_output": output.pydantic_output, - "json_output": output.json_output, - "agent": task.agent.role if task.agent else "None", + "description": output.description, + "summary": output.summary, + "raw": output.raw, + "pydantic": output.pydantic, + "json_dict": output.json_dict, + "output_format": output.output_format, + "agent": output.agent, }, "timestamp": datetime.now().isoformat(), "task_index": task_index, "inputs": inputs, + "was_replayed": was_replayed, } - self.execution_logs.append(log) - self._task_output_handler.append(log) + # Update the existing log or append if it's a new entry + if task_index < len(self.execution_logs): + self.execution_logs[task_index] = log + else: + self.execution_logs.append(log) - def _run_sequential_process(self): + self._task_output_handler.update(task_index, log) + + def _run_sequential_process(self) -> CrewOutput: """Executes tasks sequentially and returns the final output.""" self.execution_logs = [] return self._execute_tasks(self.tasks) @@ -544,7 +556,7 @@ class Crew(BaseModel): self, tasks: List[Task], manager: Optional[BaseAgent] = None, - ): + ) -> CrewOutput: task_outputs: List[TaskOutput] = [] futures: List[Tuple[Task, Future[TaskOutput], int]] = [] for task_index, task in enumerate(tasks): @@ -576,7 +588,7 @@ class Crew(BaseModel): agent=role, task=task.description, status="started" ) if task.async_execution: - context = aggregate_raw_outputs_from_task_outputs(task_outputs) + context = self._set_context(task, task_outputs) if agent_to_use: future = task.execute_async( agent=agent_to_use, @@ -594,8 +606,7 @@ class Crew(BaseModel): task_outputs = self._process_async_tasks(futures) futures.clear() - context = aggregate_raw_outputs_from_task_outputs(task_outputs) - + context = self._set_context(task, task_outputs) if agent_to_use: task_output = task.execute_sync( agent=agent_to_use, @@ -609,7 +620,29 @@ class Crew(BaseModel): if futures: task_outputs = self._process_async_tasks(futures) - return task_outputs + final_task_output = task_outputs[0] + + final_string_output = final_task_output.raw + self._finish_execution(final_string_output) + + token_usage = self.calculate_usage_metrics() + + return CrewOutput( + raw=final_task_output.raw, + pydantic=final_task_output.pydantic, + json_dict=final_task_output.json_dict, + tasks_output=[task.output for task in self.tasks if task.output], + token_usage=token_usage, + ) + # return task_outputs + + def _set_context(self, task: Task, task_outputs: List[TaskOutput]): + context = ( + aggregate_raw_outputs_from_tasks(task.context) + if task.context + else aggregate_raw_outputs_from_task_outputs(task_outputs) + ) + return context def _process_task_result(self, task: Task, output: TaskOutput) -> None: role = task.agent.role if task.agent is not None else "None" @@ -620,14 +653,16 @@ class Crew(BaseModel): def _process_async_tasks( self, futures: List[Tuple[Task, Future[TaskOutput], int]], + was_replayed: bool = False, ) -> List[TaskOutput]: task_outputs = [] for future_task, future, task_index in futures: task_output = future.result() task_outputs.append(task_output) self._process_task_result(future_task, task_output) - self._store_execution_log(future_task, task_output, task_index) - + self._store_execution_log( + future_task, task_output, task_index, was_replayed + ) return task_outputs def _get_agent(self, role: str) -> Optional[BaseAgent]: @@ -645,10 +680,31 @@ class Crew(BaseModel): return agent return None - def replay_from_task_easy(self, task_id: str, inputs: Dict[str, Any] | None = None): - task_outputs = [] - stored_outputs = self._load_stored_outputs() - start_index = next( + def _initialize_execution(self, inputs: Optional[Dict[str, Any]]) -> None: + """Initializes the execution by setting up necessary attributes and states.""" + self._execution_span = self._telemetry.crew_execution_span(self, inputs) + # self.execution_logs = [] + self._task_output_handler.reset() + if inputs is not None: + self._inputs = inputs + self._interpolate_inputs(inputs) + self._set_tasks_callbacks() + i18n = I18N(prompt_file=self.prompt_file) + for agent in self.agents: + agent.i18n = i18n + agent.crew = self # type: ignore[attr-defined] + if not agent.function_calling_llm: # type: ignore[attr-defined] + agent.function_calling_llm = self.function_calling_llm # type: ignore[attr-defined] + if agent.allow_code_execution: # type: ignore[attr-defined] + agent.tools += agent.get_code_execution_tools() # type: ignore[attr-defined] + if not agent.step_callback: # type: ignore[attr-defined] + agent.step_callback = self.step_callback # type: ignore[attr-defined] + agent.create_agent_executor() + + def _find_task_index( + self, task_id: str, stored_outputs: List[Dict[str, Any]] + ) -> Optional[int]: + return next( ( index for (index, d) in enumerate(stored_outputs) @@ -656,80 +712,20 @@ class Crew(BaseModel): ), None, ) - # todo: depending if its seqential or hierarchial how can we run this properly? - if self.process == Process.sequential: - for task in self.tasks[start_index:]: - context = aggregate_raw_outputs_from_task_outputs(task_outputs) - task_output = task.execute_sync( - agent=task.agent, context=context, tools=task.tools - ) - task_outputs.append(task_output) - self._process_task_result(task, task_output) - self._store_execution_log(task, task_output, start_index) - start_index += 1 - - elif self.process == Process.hierarchical: - manager_task = self.tasks[start_index] - context = aggregate_raw_outputs_from_task_outputs(task_outputs) - task_output = manager_task.execute_sync( - agent=manager_task.agent, context=context, tools=manager_task.tools - ) - task_outputs.append(task_output) - self._process_task_result(manager_task, task_output) - self._store_execution_log(manager_task, task_output, start_index) - - # Assuming hierarchical process involves manager delegating tasks to other agents - for task in self.tasks[start_index + 1 :]: - context = aggregate_raw_outputs_from_task_outputs(task_outputs) - task_output = task.execute_sync( - agent=task.agent, context=context, tools=task.tools - ) - task_outputs.append(task_output) - self._process_task_result(task, task_output) - self._store_execution_log(task, task_output, start_index) - start_index += 1 - - else: - raise NotImplementedError( - f"The process '{self.process}' is not implemented yet." - ) - - # this works sequentially - def replay_from_task(self, task_id: str, inputs: Dict[str, Any] | None = None): - all_tasks = self.tasks.copy() + def replay_from_task( + self, task_id: str, inputs: Dict[str, Any] | None = None + ) -> CrewOutput: stored_outputs = self._load_stored_outputs() - start_index = next( - ( - index - for (index, d) in enumerate(stored_outputs) - if d["task_id"] == str(task_id) - ), - None, - ) - # Generate tasks based on what was previously replayed - if len(self.tasks) != len(stored_outputs): - for output in stored_outputs[start_index:]: - matching_index = output["task_index"] - matching_task = self.tasks[matching_index] - if matching_task: - new_task = matching_task.copy() - new_task.agent = self._get_agent(output["agent_role"]) - all_tasks.append(new_task) + start_index = self._find_task_index(task_id, stored_outputs) if start_index is None: raise ValueError(f"Task with id {task_id} not found in the crew's tasks.") - # this handles passing the correct context along and updating following task executions with the new task_ouputs as context - stored_output_map: Dict[str, dict] = { - log["task_id"]: log["output"] for log in stored_outputs - } - task_outputs: List[ TaskOutput ] = [] # will propogate the old outputs first to add context then fill the content with the new task outputs relative to the replay start futures: List[Tuple[Task, Future[TaskOutput], int]] = [] - context = "" # inputs can be overrided with new passed inputs replay_inputs = ( @@ -738,28 +734,46 @@ class Crew(BaseModel): else stored_outputs[start_index].get("inputs", {}) ) - self._inputs = replay_inputs # overriding + self._inputs = replay_inputs if replay_inputs: self._interpolate_inputs(replay_inputs) + if self.process == Process.hierarchical: + self._create_manager_agent() + for task_index, task in enumerate(self.tasks): + if task_index < start_index: # we are skipping this task + stored_output = stored_outputs[task_index]["output"] + task_output = TaskOutput( + description=stored_output["description"], + agent=stored_output["agent"], + raw=stored_output["raw"], + pydantic=stored_output["pydantic"], + json_dict=stored_output["json_dict"], + output_format=stored_output["output_format"], + ) + task_outputs = [task_output] + context = self._set_context(task, task_outputs) - for task_index, task in enumerate(all_tasks): - if task_index < start_index: - # Use stored output for tasks before the replay point - if task.id in stored_output_map: - stored_output = stored_output_map[task.id] - task_output = TaskOutput( - description=stored_output["description"], - raw_output=stored_output["raw_output"], - pydantic_output=stored_output["pydantic_output"], - json_output=stored_output["json_output"], - agent=stored_output["agent"], - ) - task_outputs.append(task_output) - context += ( - f"\nTask {task_index + 1} Output:\n{task_output.raw_output}" - ) else: - role = task.agent.role if task.agent is not None else "None" + if task.agent and task.agent.allow_delegation: + agents_for_delegation = [ + agent for agent in self.agents if agent != task.agent + ] + if len(self.agents) > 1 and len(agents_for_delegation) > 0: + task.tools += task.agent.get_delegation_tools( + agents_for_delegation + ) + + if self.process == Process.hierarchical: + if task.agent and self.manager_agent: + self.manager_agent.tools = task.agent.get_delegation_tools( + [task.agent] + ) + if self.manager_agent: + self.manager_agent.tools = ( + self.manager_agent.get_delegation_tools(self.agents) + ) + agent_to_use = task.agent if task.agent else self.manager_agent + role = agent_to_use.role if agent_to_use is not None else "None" log_color = "bold_blue" self._logger.log( "debug", f"Replaying Working Agent: {role}", color=log_color @@ -776,38 +790,67 @@ class Crew(BaseModel): ) # Execute task for replay and subsequent tasks if task.async_execution: + context = self._set_context(task, task_outputs) future = task.execute_async( - agent=task.agent, context=context, tools=task.tools + agent=agent_to_use, context=context, tools=task.tools ) futures.append((task, future, task_index)) else: if futures: - async_outputs = self._process_async_tasks(futures) - task_outputs.extend(async_outputs) - for output in async_outputs: - context += ( - f"\nTask {task_index + 1} Output:\n{output.raw_output}" - ) + task_outputs = self._process_async_tasks(futures, True) futures.clear() + + context = self._set_context(task, task_outputs) + task_output = task.execute_sync( - agent=task.agent, context=context, tools=task.tools + agent=agent_to_use, context=context, tools=task.tools ) - task_outputs.append(task_output) + task_outputs = [task_output] self._process_task_result(task, task_output) - self._store_execution_log(task, task_output, task_index) - context += ( - f"\nTask {task_index + 1} Output:\n{task_output.raw_output}" + self._store_execution_log( + task, task_output, task_index, was_replayed=True ) # Process any remaining async tasks if futures: - async_outputs = self._process_async_tasks(futures) - task_outputs.extend(async_outputs) - # Calculate usage metrics + task_outputs = self._process_async_tasks(futures, True) + + if len(task_outputs) != 1: + raise ValueError( + "Something went wrong. Kickoff should return only one task output." + ) + final_task_output = task_outputs[0] + final_string_output = final_task_output.raw + self._finish_execution(final_string_output) + token_usage = self.calculate_usage_metrics() - # Format and return the final output - return self._format_output(task_outputs, token_usage) + return CrewOutput( + raw=final_task_output.raw, + pydantic=final_task_output.pydantic, + json_dict=final_task_output.json_dict, + tasks_output=[task.output for task in self.tasks if task.output], + token_usage=token_usage, + ) + + def _create_manager_agent(self): + i18n = I18N(prompt_file=self.prompt_file) + if self.manager_agent is not None: + self.manager_agent.allow_delegation = True + manager = self.manager_agent + if manager.tools is not None and len(manager.tools) > 0: + raise Exception("Manager agent should not have tools") + manager.tools = self.manager_agent.get_delegation_tools(self.agents) + else: + manager = Agent( + role=i18n.retrieve("hierarchical_manager_agent", "role"), + goal=i18n.retrieve("hierarchical_manager_agent", "goal"), + backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"), + tools=AgentTools(agents=self.agents).tools(), + llm=self.manager_llm, + verbose=self.verbose, + ) + self.manager_agent = manager def _load_stored_outputs(self) -> List[Dict]: try: @@ -826,37 +869,7 @@ class Crew(BaseModel): ) return [] - def save_execution_logs(self, filename: str | None = None): - """Save execution logs to a file.""" - if filename: - self._log_file = filename - try: - with open(self._log_file, "w") as f: - json.dump(self.execution_logs, f, indent=2, cls=CrewJSONEncoder) - except Exception as e: - self._logger.log("error", f"Failed to save execution logs: {str(e)}") - - def load_execution_logs(self, filename: str | None = None): - """Load execution logs from a file.""" - if filename: - self._log_file = filename - try: - with open(self._log_file, "r") as f: - self.execution_logs = json.load(f) - except FileNotFoundError: - self._logger.log( - "warning", - f"Log file {self._log_file} not found. Starting with empty logs.", - ) - self.execution_logs = [] - except json.JSONDecodeError: - self._logger.log( - "error", - f"Failed to parse log file {self._log_file}. Starting with empty logs.", - ) - self.execution_logs = [] - - def _run_hierarchical_process(self) -> Tuple[CrewOutput, Dict[str, Any]]: + def _run_hierarchical_process(self) -> CrewOutput: """Creates and assigns a manager agent to make sure the crew completes the tasks.""" i18n = I18N(prompt_file=self.prompt_file) if self.manager_agent is not None: @@ -878,16 +891,6 @@ class Crew(BaseModel): return self._execute_tasks(self.tasks, manager) - # final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs) - # self._finish_execution(final_string_output) - - # token_usage = self.calculate_usage_metrics() - - # return ( - # self._format_output(task_outputs, token_usage), - # token_usage, - # ) - def copy(self): """Create a deep copy of the Crew.""" diff --git a/src/crewai/utilities/file_handler.py b/src/crewai/utilities/file_handler.py index 40308e23c..a2a4bee88 100644 --- a/src/crewai/utilities/file_handler.py +++ b/src/crewai/utilities/file_handler.py @@ -1,8 +1,10 @@ import os import pickle -from datetime import datetime import json +from datetime import datetime +from typing import Dict, Any, List + from crewai.utilities.crew_json_encoder import CrewJSONEncoder @@ -97,6 +99,18 @@ class TaskOutputJsonHandler: json.dump(file_data, file, indent=2, cls=CrewJSONEncoder) file.truncate() + def update(self, task_index: int, log: Dict[str, Any]): + logs = self.load() + if task_index < len(logs): + logs[task_index] = log + else: + logs.append(log) + self.save(logs) + + def save(self, logs: List[Dict[str, Any]]): + with open(self.file_path, "w") as file: + json.dump(logs, file, indent=2, cls=CrewJSONEncoder) + def reset(self): """Reset the JSON file by creating an empty file.""" with open(self.file_path, "w") as f: diff --git a/tests/crew_test.py b/tests/crew_test.py index 1a6c053d5..5a6215722 100644 --- a/tests/crew_test.py +++ b/tests/crew_test.py @@ -15,6 +15,7 @@ from crewai.crews.crew_output import CrewOutput from crewai.memory.contextual.contextual_memory import ContextualMemory from crewai.process import Process from crewai.task import Task +from crewai.tasks.output_format import OutputFormat from crewai.tasks.task_output import TaskOutput from crewai.utilities import Logger, RPMController @@ -1827,17 +1828,17 @@ def test_replay_feature(): with patch.object(Task, "execute_sync") as mock_execute_task: mock_execute_task.return_value = TaskOutput( description="Mock description", - raw_output="Mocked output for list of ideas", + raw="Mocked output for list of ideas", agent="Researcher", + json_dict=None, + output_format=OutputFormat.RAW, + pydantic=None, + summary="Mocked output for list of ideas", ) crew.kickoff() crew.replay_from_task(str(write.id)) # Ensure context was passed correctly - context_passed = mock_execute_task.call_args_list[1][1]["context"] - assert ( - "Mocked output for list of ideas" in context_passed - ) # ensure context passed assert mock_execute_task.call_count == 3