fix: Add delegation tracking and improve error handling

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-02-14 21:02:24 +00:00
parent 26751bb2cb
commit c4263bf8e6
3 changed files with 173 additions and 51 deletions

View File

@@ -690,22 +690,10 @@ class Crew(BaseModel):
return self._execute_tasks(self.tasks)
def _create_manager_agent(self):
"""Creates and configures the manager agent for hierarchical process."""
i18n = I18N(prompt_file=self.prompt_file)
if self.manager_agent is not None:
manager = self.manager_agent
manager.allow_delegation = True
manager.crew = self
try:
delegation_tools = AgentTools(agents=self.agents).tools()
manager.tools = delegation_tools
self._logger.log(
"info",
f"Manager agent has delegation tools: {[tool.name for tool in manager.tools]}",
color="blue",
)
except Exception as e:
self._logger.log("error", f"Failed to set manager tools: {str(e)}", color="red")
raise ValueError(f"Failed to set manager tools: {str(e)}")
else:
self.manager_llm = create_llm(self.manager_llm)
manager = Agent(
@@ -718,18 +706,23 @@ class Crew(BaseModel):
verbose=self.verbose,
)
self.manager_agent = manager
manager.crew = self
try:
delegation_tools = AgentTools(agents=self.agents).tools()
manager.tools = delegation_tools
self._logger.log(
"info",
f"Manager agent has delegation tools: {[tool.name for tool in manager.tools]}",
color="blue",
)
except Exception as e:
self._logger.log("error", f"Failed to set manager tools: {str(e)}", color="red")
raise ValueError(f"Failed to set manager tools: {str(e)}")
# Configure manager agent
manager.allow_delegation = True
manager.crew = self
try:
delegation_tools = AgentTools(agents=self.agents).tools()
manager.tools = delegation_tools
self._logger.log(
"info",
f"Manager agent has delegation tools: {[tool.name for tool in manager.tools]}",
color="blue",
)
except Exception as e:
self._logger.log("error", f"Failed to set manager tools: {str(e)}", color="red")
raise ValueError(f"Failed to set manager tools: {str(e)}")
return manager
def _execute_tasks(
@@ -742,7 +735,8 @@ class Crew(BaseModel):
Args:
tasks (List[Task]): List of tasks to execute
manager (Optional[BaseAgent], optional): Manager agent to use for delegation. Defaults to None.
start_index (Optional[int]): Index to start execution from
was_replayed (bool): Whether this is a replay execution
Returns:
CrewOutput: Final output of the crew
@@ -768,9 +762,54 @@ class Crew(BaseModel):
f"No agent available for task: {task.description}. Ensure that either the task has an assigned agent or a manager agent is provided."
)
# Determine which tools to use - task tools take precedence over agent tools
tools_for_task = task.tools or agent_to_use.tools or []
tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task)
# In hierarchical process, if task has no agent, assign it to the first available agent
if self.process == Process.hierarchical and not task.agent and len(self.agents) > 0:
task.agent = self.agents[0]
self._logger.log(
"debug",
f"Assigning task {task_index} to agent {task.agent.role}",
color="blue",
)
# Track delegation in hierarchical process
if self.process == Process.hierarchical and task.agent and task.agent != self.manager_agent:
task.increment_delegations(self.manager_agent.role)
task.processed_by_agents.add(self.manager_agent.role)
self._logger.log(
"debug",
f"Tracking delegation for task {task_index} from {task.agent.role} to {self.manager_agent.role}",
color="blue",
)
# Determine which tools to use
tools_for_task = []
# Get delegation tools if agent allows delegation or is in sequential process
delegation_tools = []
if agent_to_use.allow_delegation or (self.process == Process.sequential and len(self.agents) > 1):
if self.process == Process.hierarchical:
delegation_tools = self._update_manager_tools(task, [])
else:
delegation_tools = self._add_delegation_tools(task, [])
if delegation_tools:
tools_for_task.extend(delegation_tools)
self._logger.log(
"debug",
f"Added delegation tools for agent {agent_to_use.role}: {[tool.name for tool in delegation_tools]}",
color="blue",
)
# Add task tools if present, otherwise use agent tools
if task.tools:
tools_for_task.extend([
tool for tool in task.tools
if not any(dtool.name == tool.name for dtool in tools_for_task)
])
elif agent_to_use.tools:
tools_for_task.extend([
tool for tool in agent_to_use.tools
if not any(dtool.name == tool.name for dtool in tools_for_task)
])
self._log_task_start(task, agent_to_use.role)
@@ -841,52 +880,109 @@ class Crew(BaseModel):
def _prepare_tools(
self, agent: BaseAgent, task: Task, tools: List[Tool]
) -> List[Tool]:
"""Prepare tools for an agent to use in a task.
Args:
agent: The agent that will use the tools
task: The task being executed
tools: The initial set of tools
Returns:
List[Tool]: The final set of tools for the agent to use
"""
# For manager agent, only use delegation tools
if agent == self.manager_agent:
return self._update_manager_tools(task, [])
# Add delegation tools if agent allows delegation
# Get delegation tools first if agent allows delegation
delegation_tools = []
if agent.allow_delegation:
if self.process == Process.hierarchical:
if self.manager_agent:
tools = self._update_manager_tools(task, tools)
delegation_tools = self._update_manager_tools(task, [])
else:
raise ValueError(
"Manager agent is required for hierarchical process."
)
elif agent and agent.allow_delegation:
tools = self._add_delegation_tools(task, tools)
else:
delegation_tools = self._add_delegation_tools(task, [])
# Start with task tools if present, otherwise use agent tools
final_tools = []
if task.tools:
final_tools.extend(task.tools)
elif agent.tools:
final_tools.extend(agent.tools)
# Add code execution tools if agent allows code execution
if agent.allow_code_execution:
tools = self._add_code_execution_tools(agent, tools)
code_tools = self._add_code_execution_tools(agent, [])
final_tools.extend(code_tools)
# Add multimodal tools if agent supports them
if agent and agent.multimodal:
tools = self._add_multimodal_tools(agent, tools)
multimodal_tools = self._add_multimodal_tools(agent, [])
final_tools.extend(multimodal_tools)
return tools
# Always add delegation tools if agent allows delegation
if delegation_tools:
# Add delegation tools to the beginning
final_tools = delegation_tools + [
tool for tool in final_tools
if not any(dtool.name == tool.name for dtool in delegation_tools)
]
return final_tools
def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]:
"""Get the agent that should execute the task.
In hierarchical process, all tasks are delegated through the manager agent.
In sequential process, tasks are executed by their assigned agent.
"""
if self.process == Process.hierarchical:
return self.manager_agent
return task.agent
# In sequential process, if agent allows delegation and there are other agents,
# we track delegation to the first available agent
agent_to_use = task.agent
if agent_to_use and agent_to_use.allow_delegation and len(self.agents) > 1:
for other_agent in self.agents:
if other_agent != agent_to_use:
task.increment_delegations(other_agent.role)
agent_to_use = other_agent
break
return agent_to_use
def _merge_tools(
self, existing_tools: List[Tool], new_tools: List[Tool]
) -> List[Tool]:
"""Merge new tools into existing tools list, avoiding duplicates by tool name."""
"""Merge new tools into existing tools list, preserving delegation tools."""
if not new_tools:
return existing_tools
# Keep track of delegation tools
delegation_tools = [
tool for tool in existing_tools
if tool.name in {"Delegate Work", "Ask Question"}
]
# Create mapping of tool names to new tools
new_tool_map = {tool.name: tool for tool in new_tools}
# Remove any existing tools that will be replaced
tools = [tool for tool in existing_tools if tool.name not in new_tool_map]
tools = [
tool for tool in existing_tools
if tool.name not in new_tool_map and tool not in delegation_tools
]
# Add all new tools
tools.extend(new_tools)
# Add back delegation tools
tools.extend(delegation_tools)
return tools
def _inject_delegation_tools(
@@ -906,14 +1002,39 @@ class Crew(BaseModel):
return self._merge_tools(tools, code_tools)
def _add_delegation_tools(self, task: Task, tools: List[Tool]):
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
if not tools:
tools = []
tools = self._inject_delegation_tools(
tools, task.agent, agents_for_delegation
)
return tools
"""Add delegation tools for the task's agent.
Args:
task: The task being executed
tools: The current set of tools
Returns:
List[Tool]: The tools with delegation tools added
"""
if task.agent and task.agent.allow_delegation:
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
if len(agents_for_delegation) > 0:
delegation_tools = self._inject_delegation_tools([], task.agent, agents_for_delegation)
if delegation_tools:
self._logger.log(
"debug",
f"Adding delegation tools for agent {task.agent.role}: {[tool.name for tool in delegation_tools]}",
color="blue",
)
return delegation_tools
elif task.agent and task.agent.crew and task.agent.crew.process == Process.sequential:
# In sequential process, if agent has crew but no delegation tools, add them
agents_for_delegation = [agent for agent in task.agent.crew.agents if agent != task.agent]
if len(agents_for_delegation) > 0:
delegation_tools = self._inject_delegation_tools([], task.agent, agents_for_delegation)
if delegation_tools:
self._logger.log(
"debug",
f"Adding delegation tools for agent {task.agent.role} in sequential process: {[tool.name for tool in delegation_tools]}",
color="blue",
)
return delegation_tools
return []
def _log_task_start(self, task: Task, role: str = "None"):
if self.output_log_file:

View File

@@ -358,9 +358,7 @@ class Task(BaseModel):
self.start_time = datetime.datetime.now()
self._execution_span = self._telemetry.task_started(crew=agent.crew, task=self)
# Track delegation if this task is being executed by a different agent
if self.agent and agent.role != self.agent.role:
self.increment_delegations(agent.role)
# Track delegation is now handled in _execute_tasks to ensure proper counting
self.prompt_context = context
tools = tools or self.tools or []

View File

@@ -1,6 +1,6 @@
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.base_tool import BaseTool
from crewai.utilities import I18N
from crewai.utilities import I18N, Logger
from .ask_question_tool import AskQuestionTool
from .delegate_work_tool import DelegateWorkTool
@@ -17,10 +17,13 @@ class AgentTools:
def __init__(self, agents: list[BaseAgent], i18n: I18N = I18N()):
self.agents = agents
self.i18n = i18n
self._logger = Logger()
def tools(self) -> list[BaseTool]:
"""Get all available agent tools"""
coworkers = ", ".join([f"{agent.role}" for agent in self.agents])
self._logger.log("debug", f"Creating delegation tools for agents: {coworkers}", color="blue")
delegate_tool = DelegateWorkTool(
agents=self.agents,