From c7bf609e18f8cf801b5c9e29c53ab35c6f097ea6 Mon Sep 17 00:00:00 2001 From: Lorenze Jay Date: Thu, 11 Jul 2024 11:14:43 -0700 Subject: [PATCH] refactoring for cleaner code --- src/crewai/crew.py | 265 ++++++++++++++++++--------------------------- 1 file changed, 108 insertions(+), 157 deletions(-) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 44c8990a2..f871ffdca 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -539,7 +539,7 @@ class Crew(BaseModel): "inputs": inputs, "was_replayed": was_replayed, } - # Update the existing log or append if it's a new entry + if task_index < len(self.execution_logs): self.execution_logs[task_index] = log else: @@ -548,92 +548,113 @@ class Crew(BaseModel): def _run_sequential_process(self) -> CrewOutput: """Executes tasks sequentially and returns the final output.""" - self.execution_logs = [] return self._execute_tasks(self.tasks) + def _run_hierarchical_process(self) -> CrewOutput: + """Creates and assigns a manager agent to make sure the crew completes the tasks.""" + self._create_manager_agent() + return self._execute_tasks(self.tasks, self.manager_agent) + + def _create_manager_agent(self): + i18n = I18N(prompt_file=self.prompt_file) + if self.manager_agent is not None: + self.manager_agent.allow_delegation = True + manager = self.manager_agent + if manager.tools is not None and len(manager.tools) > 0: + raise Exception("Manager agent should not have tools") + manager.tools = self.manager_agent.get_delegation_tools(self.agents) + else: + manager = Agent( + role=i18n.retrieve("hierarchical_manager_agent", "role"), + goal=i18n.retrieve("hierarchical_manager_agent", "goal"), + backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"), + tools=AgentTools(agents=self.agents).tools(), + llm=self.manager_llm, + verbose=self.verbose, + ) + self.manager_agent = manager + def _execute_tasks( self, tasks: List[Task], manager: Optional[BaseAgent] = None, ) -> CrewOutput: + """Executes tasks sequentially and returns the final output. + + Args: + tasks (List[Task]): List of tasks to execute + manager (Optional[BaseAgent], optional): Manager agent to use for delegation. Defaults to None. + + Returns: + CrewOutput: Final output of the crew + """ task_outputs: List[TaskOutput] = [] futures: List[Tuple[Task, Future[TaskOutput], int]] = [] + self.execution_logs = [] for task_index, task in enumerate(tasks): - if task.agent and task.agent.allow_delegation: - agents_for_delegation = [ - agent for agent in self.agents if agent != task.agent - ] - if len(self.agents) > 1 and len(agents_for_delegation) > 0: - task.tools += task.agent.get_delegation_tools(agents_for_delegation) - - if self.process == Process.hierarchical: - if task.agent and manager: - manager.tools = task.agent.get_delegation_tools([task.agent]) - if manager: - manager.tools = manager.get_delegation_tools(self.agents) - + self._prepare_task(task, manager) agent_to_use = task.agent if task.agent else manager - role = agent_to_use.role if agent_to_use is not None else "None" - - self._logger.log("debug", f"Working Agent: {role}", color="bold_purple") - self._logger.log( - "info", - f"Starting Task: {task.description}", - color="bold_purple", - ) - - if self.output_log_file: - self._file_handler.log( - agent=role, task=task.description, status="started" + if agent_to_use is None: + raise ValueError( + f"No agent available for task: {task.description}. Ensure that either the task has an assigned agent or a manager agent is provided." ) + self._log_task_start(task, agent_to_use) + if task.async_execution: context = self._set_context(task, task_outputs) - if agent_to_use: - future = task.execute_async( - agent=agent_to_use, - context=context, - tools=agent_to_use.tools, - ) - futures.append((task, future, task_index)) - else: # sequential async - self._logger.log( - "warning", f"No agent available for task: {task.description}" - ) - - else: # sync execution + future = task.execute_async( + agent=agent_to_use, + context=context, + tools=agent_to_use.tools, + ) + futures.append((task, future, task_index)) + else: if futures: task_outputs = self._process_async_tasks(futures) futures.clear() context = self._set_context(task, task_outputs) - if agent_to_use: - task_output = task.execute_sync( - agent=agent_to_use, - context=context, - tools=agent_to_use.tools, - ) - task_outputs = [task_output] - self._process_task_result(task, task_output) - self._store_execution_log(task, task_output, task_index) + task_output = task.execute_sync( + agent=agent_to_use, + context=context, + tools=agent_to_use.tools, + ) + task_outputs = [task_output] + self._process_task_result(task, task_output) + self._store_execution_log(task, task_output, task_index) if futures: task_outputs = self._process_async_tasks(futures) - final_task_output = task_outputs[0] + return self._create_crew_output(task_outputs) - final_string_output = final_task_output.raw - self._finish_execution(final_string_output) + def _prepare_task(self, task: Task, manager: Optional[BaseAgent]): + if task.agent and task.agent.allow_delegation: + self._add_delegation_tools(task) + if self.process == Process.hierarchical: + self._update_manager_tools(task, manager) - token_usage = self.calculate_usage_metrics() + def _add_delegation_tools(self, task: Task): + agents_for_delegation = [agent for agent in self.agents if agent != task.agent] + if len(self.agents) > 1 and agents_for_delegation: + task.tools += task.agent.get_delegation_tools( + agents_for_delegation + ) # TODO: FIX TYPE ERROR HERE - return CrewOutput( - raw=final_task_output.raw, - pydantic=final_task_output.pydantic, - json_dict=final_task_output.json_dict, - tasks_output=[task.output for task in self.tasks if task.output], - token_usage=token_usage, - ) - # return task_outputs + def _log_task_start( + self, task: Task, agent: Optional[BaseAgent], color: str = "bold_purple" + ): + role = agent.role if agent else "None" + self._logger.log("debug", f"Working Agent: {role}", color=color) + self._logger.log("info", f"Starting Task: {task.description}", color=color) + if self.output_log_file: + self._file_handler.log(agent=role, task=task.description, status="started") + + def _update_manager_tools(self, task: Task, manager: Optional[BaseAgent]): + if task.agent and manager: + manager.tools = task.agent.get_delegation_tools([task.agent]) + if manager: + manager.tools = manager.get_delegation_tools(self.agents) def _set_context(self, task: Task, task_outputs: List[TaskOutput]): context = ( @@ -649,6 +670,24 @@ class Crew(BaseModel): if self.output_log_file: self._file_handler.log(agent=role, task=output, status="completed") + def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput: + if len(task_outputs) != 1: + raise ValueError( + "Something went wrong. Kickoff should return only one task output." + ) + final_task_output = task_outputs[0] + final_string_output = final_task_output.raw + self._finish_execution(final_string_output) + token_usage = self.calculate_usage_metrics() + + return CrewOutput( + raw=final_task_output.raw, + pydantic=final_task_output.pydantic, + json_dict=final_task_output.json_dict, + tasks_output=[task.output for task in self.tasks if task.output], + token_usage=token_usage, + ) + def _process_async_tasks( self, futures: List[Tuple[Task, Future[TaskOutput], int]], @@ -679,16 +718,13 @@ class Crew(BaseModel): def replay_from_task( self, task_id: str, inputs: Dict[str, Any] | None = None ) -> CrewOutput: - # stored_outputs = self._load_stored_outputs() stored_outputs = TaskOutputJsonHandler(CREW_TASKS_OUTPUT_FILE).load() start_index = self._find_task_index(task_id, stored_outputs) if start_index is None: raise ValueError(f"Task with id {task_id} not found in the crew's tasks.") - task_outputs: List[ - TaskOutput - ] = [] # will propogate the old outputs first to add context then fill the content with the new task outputs relative to the replay start + task_outputs: List[TaskOutput] = [] futures: List[Tuple[Task, Future[TaskOutput], int]] = [] # inputs can be overrided with new passed inputs @@ -717,41 +753,14 @@ class Crew(BaseModel): self.tasks[task_index].output = task_output task_outputs = [task_output] else: - if task.agent and task.agent.allow_delegation: - agents_for_delegation = [ - agent for agent in self.agents if agent != task.agent - ] - if len(self.agents) > 1 and len(agents_for_delegation) > 0: - task.tools += task.agent.get_delegation_tools( - agents_for_delegation - ) - - if self.process == Process.hierarchical: - if task.agent and self.manager_agent: - self.manager_agent.tools = task.agent.get_delegation_tools( - [task.agent] - ) - if self.manager_agent: - self.manager_agent.tools = ( - self.manager_agent.get_delegation_tools(self.agents) - ) + self._prepare_task(task, self.manager_agent) agent_to_use = task.agent if task.agent else self.manager_agent - role = agent_to_use.role if agent_to_use is not None else "None" - log_color = "bold_blue" - self._logger.log( - "debug", f"Replaying Working Agent: {role}", color=log_color - ) - self._logger.log( - "info", - f"Replaying Task: {task.description}", - color=log_color, - ) - - if self.output_log_file: - self._file_handler.log( - agent=role, task=task.description, status="started" + if agent_to_use is None: + raise ValueError( + f"No agent available for task: {task.description}. Ensure that either the task has an assigned agent or a manager agent is provided." ) - # Execute task for replay and subsequent tasks + self._log_task_start(task, agent_to_use, "bold_blue") + if task.async_execution: context = self._set_context(task, task_outputs) future = task.execute_async( @@ -778,65 +787,7 @@ class Crew(BaseModel): if futures: task_outputs = self._process_async_tasks(futures, True) - if len(task_outputs) != 1: - raise ValueError( - "Something went wrong. Kickoff should return only one task output." - ) - final_task_output = task_outputs[0] - final_string_output = final_task_output.raw - self._finish_execution(final_string_output) - - token_usage = self.calculate_usage_metrics() - - return CrewOutput( - raw=final_task_output.raw, - pydantic=final_task_output.pydantic, - json_dict=final_task_output.json_dict, - tasks_output=[task.output for task in self.tasks if task.output], - token_usage=token_usage, - ) - - def _create_manager_agent(self): - i18n = I18N(prompt_file=self.prompt_file) - if self.manager_agent is not None: - self.manager_agent.allow_delegation = True - manager = self.manager_agent - if manager.tools is not None and len(manager.tools) > 0: - raise Exception("Manager agent should not have tools") - manager.tools = self.manager_agent.get_delegation_tools(self.agents) - else: - manager = Agent( - role=i18n.retrieve("hierarchical_manager_agent", "role"), - goal=i18n.retrieve("hierarchical_manager_agent", "goal"), - backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"), - tools=AgentTools(agents=self.agents).tools(), - llm=self.manager_llm, - verbose=self.verbose, - ) - self.manager_agent = manager - - def _run_hierarchical_process(self) -> CrewOutput: - """Creates and assigns a manager agent to make sure the crew completes the tasks.""" - self.execution_logs = [] - i18n = I18N(prompt_file=self.prompt_file) - if self.manager_agent is not None: - self.manager_agent.allow_delegation = True - manager = self.manager_agent - if manager.tools is not None and len(manager.tools) > 0: - raise Exception("Manager agent should not have tools") - manager.tools = self.manager_agent.get_delegation_tools(self.agents) - else: - manager = Agent( - role=i18n.retrieve("hierarchical_manager_agent", "role"), - goal=i18n.retrieve("hierarchical_manager_agent", "goal"), - backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"), - tools=AgentTools(agents=self.agents).tools(), - llm=self.manager_llm, - verbose=self.verbose, - ) - self.manager_agent = manager - - return self._execute_tasks(self.tasks, manager) + return self._create_crew_output(task_outputs) def copy(self): """Create a deep copy of the Crew."""