From c4263bf8e6f7a659d66387b7681094480d4a2f20 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 14 Feb 2025 21:02:24 +0000 Subject: [PATCH] fix: Add delegation tracking and improve error handling Co-Authored-By: Joe Moura --- src/crewai/crew.py | 215 +++++++++++++++----- src/crewai/task.py | 4 +- src/crewai/tools/agent_tools/agent_tools.py | 5 +- 3 files changed, 173 insertions(+), 51 deletions(-) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 51d803073..2f3f6e79a 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -690,22 +690,10 @@ class Crew(BaseModel): return self._execute_tasks(self.tasks) def _create_manager_agent(self): + """Creates and configures the manager agent for hierarchical process.""" i18n = I18N(prompt_file=self.prompt_file) if self.manager_agent is not None: manager = self.manager_agent - manager.allow_delegation = True - manager.crew = self - try: - delegation_tools = AgentTools(agents=self.agents).tools() - manager.tools = delegation_tools - self._logger.log( - "info", - f"Manager agent has delegation tools: {[tool.name for tool in manager.tools]}", - color="blue", - ) - except Exception as e: - self._logger.log("error", f"Failed to set manager tools: {str(e)}", color="red") - raise ValueError(f"Failed to set manager tools: {str(e)}") else: self.manager_llm = create_llm(self.manager_llm) manager = Agent( @@ -718,18 +706,23 @@ class Crew(BaseModel): verbose=self.verbose, ) self.manager_agent = manager - manager.crew = self - try: - delegation_tools = AgentTools(agents=self.agents).tools() - manager.tools = delegation_tools - self._logger.log( - "info", - f"Manager agent has delegation tools: {[tool.name for tool in manager.tools]}", - color="blue", - ) - except Exception as e: - self._logger.log("error", f"Failed to set manager tools: {str(e)}", color="red") - raise ValueError(f"Failed to set manager tools: {str(e)}") + + # Configure manager agent + manager.allow_delegation = True + manager.crew = self + try: + delegation_tools = AgentTools(agents=self.agents).tools() + manager.tools = delegation_tools + self._logger.log( + "info", + f"Manager agent has delegation tools: {[tool.name for tool in manager.tools]}", + color="blue", + ) + except Exception as e: + self._logger.log("error", f"Failed to set manager tools: {str(e)}", color="red") + raise ValueError(f"Failed to set manager tools: {str(e)}") + + return manager def _execute_tasks( @@ -742,7 +735,8 @@ class Crew(BaseModel): Args: tasks (List[Task]): List of tasks to execute - manager (Optional[BaseAgent], optional): Manager agent to use for delegation. Defaults to None. + start_index (Optional[int]): Index to start execution from + was_replayed (bool): Whether this is a replay execution Returns: CrewOutput: Final output of the crew @@ -768,9 +762,54 @@ class Crew(BaseModel): f"No agent available for task: {task.description}. Ensure that either the task has an assigned agent or a manager agent is provided." ) - # Determine which tools to use - task tools take precedence over agent tools - tools_for_task = task.tools or agent_to_use.tools or [] - tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task) + # In hierarchical process, if task has no agent, assign it to the first available agent + if self.process == Process.hierarchical and not task.agent and len(self.agents) > 0: + task.agent = self.agents[0] + self._logger.log( + "debug", + f"Assigning task {task_index} to agent {task.agent.role}", + color="blue", + ) + + # Track delegation in hierarchical process + if self.process == Process.hierarchical and task.agent and task.agent != self.manager_agent: + task.increment_delegations(self.manager_agent.role) + task.processed_by_agents.add(self.manager_agent.role) + self._logger.log( + "debug", + f"Tracking delegation for task {task_index} from {task.agent.role} to {self.manager_agent.role}", + color="blue", + ) + + # Determine which tools to use + tools_for_task = [] + + # Get delegation tools if agent allows delegation or is in sequential process + delegation_tools = [] + if agent_to_use.allow_delegation or (self.process == Process.sequential and len(self.agents) > 1): + if self.process == Process.hierarchical: + delegation_tools = self._update_manager_tools(task, []) + else: + delegation_tools = self._add_delegation_tools(task, []) + if delegation_tools: + tools_for_task.extend(delegation_tools) + self._logger.log( + "debug", + f"Added delegation tools for agent {agent_to_use.role}: {[tool.name for tool in delegation_tools]}", + color="blue", + ) + + # Add task tools if present, otherwise use agent tools + if task.tools: + tools_for_task.extend([ + tool for tool in task.tools + if not any(dtool.name == tool.name for dtool in tools_for_task) + ]) + elif agent_to_use.tools: + tools_for_task.extend([ + tool for tool in agent_to_use.tools + if not any(dtool.name == tool.name for dtool in tools_for_task) + ]) self._log_task_start(task, agent_to_use.role) @@ -841,52 +880,109 @@ class Crew(BaseModel): def _prepare_tools( self, agent: BaseAgent, task: Task, tools: List[Tool] ) -> List[Tool]: + """Prepare tools for an agent to use in a task. + + Args: + agent: The agent that will use the tools + task: The task being executed + tools: The initial set of tools + + Returns: + List[Tool]: The final set of tools for the agent to use + """ # For manager agent, only use delegation tools if agent == self.manager_agent: return self._update_manager_tools(task, []) - # Add delegation tools if agent allows delegation + # Get delegation tools first if agent allows delegation + delegation_tools = [] if agent.allow_delegation: if self.process == Process.hierarchical: if self.manager_agent: - tools = self._update_manager_tools(task, tools) + delegation_tools = self._update_manager_tools(task, []) else: raise ValueError( "Manager agent is required for hierarchical process." ) - elif agent and agent.allow_delegation: - tools = self._add_delegation_tools(task, tools) + else: + delegation_tools = self._add_delegation_tools(task, []) + + # Start with task tools if present, otherwise use agent tools + final_tools = [] + if task.tools: + final_tools.extend(task.tools) + elif agent.tools: + final_tools.extend(agent.tools) # Add code execution tools if agent allows code execution if agent.allow_code_execution: - tools = self._add_code_execution_tools(agent, tools) + code_tools = self._add_code_execution_tools(agent, []) + final_tools.extend(code_tools) + # Add multimodal tools if agent supports them if agent and agent.multimodal: - tools = self._add_multimodal_tools(agent, tools) + multimodal_tools = self._add_multimodal_tools(agent, []) + final_tools.extend(multimodal_tools) - return tools + # Always add delegation tools if agent allows delegation + if delegation_tools: + # Add delegation tools to the beginning + final_tools = delegation_tools + [ + tool for tool in final_tools + if not any(dtool.name == tool.name for dtool in delegation_tools) + ] + + return final_tools def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]: + """Get the agent that should execute the task. + + In hierarchical process, all tasks are delegated through the manager agent. + In sequential process, tasks are executed by their assigned agent. + """ if self.process == Process.hierarchical: return self.manager_agent - return task.agent + + # In sequential process, if agent allows delegation and there are other agents, + # we track delegation to the first available agent + agent_to_use = task.agent + if agent_to_use and agent_to_use.allow_delegation and len(self.agents) > 1: + for other_agent in self.agents: + if other_agent != agent_to_use: + task.increment_delegations(other_agent.role) + agent_to_use = other_agent + break + + return agent_to_use def _merge_tools( self, existing_tools: List[Tool], new_tools: List[Tool] ) -> List[Tool]: - """Merge new tools into existing tools list, avoiding duplicates by tool name.""" + """Merge new tools into existing tools list, preserving delegation tools.""" if not new_tools: return existing_tools + # Keep track of delegation tools + delegation_tools = [ + tool for tool in existing_tools + if tool.name in {"Delegate Work", "Ask Question"} + ] + # Create mapping of tool names to new tools new_tool_map = {tool.name: tool for tool in new_tools} # Remove any existing tools that will be replaced - tools = [tool for tool in existing_tools if tool.name not in new_tool_map] + tools = [ + tool for tool in existing_tools + if tool.name not in new_tool_map and tool not in delegation_tools + ] # Add all new tools tools.extend(new_tools) + # Add back delegation tools + tools.extend(delegation_tools) + return tools def _inject_delegation_tools( @@ -906,14 +1002,39 @@ class Crew(BaseModel): return self._merge_tools(tools, code_tools) def _add_delegation_tools(self, task: Task, tools: List[Tool]): - agents_for_delegation = [agent for agent in self.agents if agent != task.agent] - if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent: - if not tools: - tools = [] - tools = self._inject_delegation_tools( - tools, task.agent, agents_for_delegation - ) - return tools + """Add delegation tools for the task's agent. + + Args: + task: The task being executed + tools: The current set of tools + + Returns: + List[Tool]: The tools with delegation tools added + """ + if task.agent and task.agent.allow_delegation: + agents_for_delegation = [agent for agent in self.agents if agent != task.agent] + if len(agents_for_delegation) > 0: + delegation_tools = self._inject_delegation_tools([], task.agent, agents_for_delegation) + if delegation_tools: + self._logger.log( + "debug", + f"Adding delegation tools for agent {task.agent.role}: {[tool.name for tool in delegation_tools]}", + color="blue", + ) + return delegation_tools + elif task.agent and task.agent.crew and task.agent.crew.process == Process.sequential: + # In sequential process, if agent has crew but no delegation tools, add them + agents_for_delegation = [agent for agent in task.agent.crew.agents if agent != task.agent] + if len(agents_for_delegation) > 0: + delegation_tools = self._inject_delegation_tools([], task.agent, agents_for_delegation) + if delegation_tools: + self._logger.log( + "debug", + f"Adding delegation tools for agent {task.agent.role} in sequential process: {[tool.name for tool in delegation_tools]}", + color="blue", + ) + return delegation_tools + return [] def _log_task_start(self, task: Task, role: str = "None"): if self.output_log_file: diff --git a/src/crewai/task.py b/src/crewai/task.py index d5faf0d62..6aaf5a2c3 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -358,9 +358,7 @@ class Task(BaseModel): self.start_time = datetime.datetime.now() self._execution_span = self._telemetry.task_started(crew=agent.crew, task=self) - # Track delegation if this task is being executed by a different agent - if self.agent and agent.role != self.agent.role: - self.increment_delegations(agent.role) + # Track delegation is now handled in _execute_tasks to ensure proper counting self.prompt_context = context tools = tools or self.tools or [] diff --git a/src/crewai/tools/agent_tools/agent_tools.py b/src/crewai/tools/agent_tools/agent_tools.py index 5a6ba58c0..e81ebb61b 100644 --- a/src/crewai/tools/agent_tools/agent_tools.py +++ b/src/crewai/tools/agent_tools/agent_tools.py @@ -1,6 +1,6 @@ from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.tools.base_tool import BaseTool -from crewai.utilities import I18N +from crewai.utilities import I18N, Logger from .ask_question_tool import AskQuestionTool from .delegate_work_tool import DelegateWorkTool @@ -17,10 +17,13 @@ class AgentTools: def __init__(self, agents: list[BaseAgent], i18n: I18N = I18N()): self.agents = agents self.i18n = i18n + self._logger = Logger() def tools(self) -> list[BaseTool]: """Get all available agent tools""" coworkers = ", ".join([f"{agent.role}" for agent in self.agents]) + + self._logger.log("debug", f"Creating delegation tools for agents: {coworkers}", color="blue") delegate_tool = DelegateWorkTool( agents=self.agents,