Compare commits

...

10 Commits

Author SHA1 Message Date
Devin AI
939e8e6736 fix: Fix code execution and multimodal tool handling in Agent class
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-14 21:25:17 +00:00
Devin AI
c6153cc1b4 fix: Fix multimodal and code execution tool handling
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-14 21:20:46 +00:00
Devin AI
4832b35779 fix: Fix code execution and multimodal tool handling
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-14 21:16:57 +00:00
Devin AI
be9caaf5ee fix: Fix import sorting with ruff
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-14 21:13:46 +00:00
Devin AI
c080227751 fix: Fix import sorting and type checking issues
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-14 21:13:06 +00:00
Devin AI
2fbff69c81 fix: Fix type checking and import issues
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-14 21:10:53 +00:00
Devin AI
32c1c81978 style: Fix import sorting in agent_tools.py
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-14 21:06:36 +00:00
Devin AI
c4263bf8e6 fix: Add delegation tracking and improve error handling
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-14 21:02:24 +00:00
Devin AI
26751bb2cb fix: Add error handling and consolidate tool assignment logic
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-14 20:32:16 +00:00
Devin AI
e4282f7c53 fix: Prevent manager agent from having tools assigned (#2131)
- Add model validation to prevent tool assignment to manager agent
- Improve error handling in _create_manager_agent
- Add test to verify manager agent tools validation

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-14 20:25:12 +00:00
5 changed files with 337 additions and 63 deletions

View File

@@ -135,6 +135,19 @@ class Agent(BaseAgent):
if self.allow_code_execution:
self._validate_docker_installation()
# If this is a manager agent, ensure it only has delegation tools
if self.role == "Manager" and self.allow_delegation and self.crew:
try:
self.tools = self.get_delegation_tools(self.crew.agents)
self._logger.log(
"info",
f"Manager agent has delegation tools: {[tool.name for tool in self.tools]}",
color="blue",
)
except Exception as e:
self._logger.log("error", f"Failed to set delegation tools: {str(e)}", color="red")
raise ValueError(f"Failed to set delegation tools: {str(e)}")
return self
def _setup_agent_executor(self):
@@ -325,21 +338,33 @@ class Agent(BaseAgent):
return tools
def get_multimodal_tools(self) -> Sequence[BaseTool]:
from crewai.tools.agent_tools.add_image_tool import AddImageTool
"""Get multimodal tools for the agent.
Returns:
List[BaseTool]: List containing the AddImageTool if agent is multimodal
"""
if self.multimodal:
from crewai.tools.agent_tools.add_image_tool import AddImageTool
return [AddImageTool()]
return []
return [AddImageTool()]
def get_code_execution_tools(self):
try:
from crewai_tools import CodeInterpreterTool # type: ignore
# Set the unsafe_mode based on the code_execution_mode attribute
unsafe_mode = self.code_execution_mode == "unsafe"
return [CodeInterpreterTool(unsafe_mode=unsafe_mode)]
except ModuleNotFoundError:
self._logger.log(
"info", "Coding tools not available. Install crewai_tools. "
)
def get_code_execution_tools(self) -> Sequence[BaseTool]:
"""Get code execution tools for the agent.
Returns:
List[BaseTool]: List containing the CodeInterpreterTool if code execution is allowed
"""
if self.allow_code_execution:
try:
from crewai_tools import CodeInterpreterTool # type: ignore
# Set the unsafe_mode based on the code_execution_mode attribute
unsafe_mode = self.code_execution_mode == "unsafe"
return [CodeInterpreterTool(unsafe_mode=unsafe_mode)]
except ModuleNotFoundError:
self._logger.log(
"warning", "Coding tools not available. Install crewai_tools.", color="yellow"
)
return []
def get_output_converter(self, llm, text, model, instructions):
return Converter(llm=llm, text=text, model=model, instructions=instructions)

View File

@@ -36,7 +36,11 @@ from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.agent_tools.agent_tools import (
ASK_QUESTION_TOOL,
DELEGATE_WORK_TOOL,
AgentTools,
)
from crewai.tools.base_tool import Tool
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N, FileHandler, Logger, RPMController
@@ -314,14 +318,16 @@ class Crew(BaseModel):
{},
)
if (self.manager_agent is not None) and (
self.agents.count(self.manager_agent) > 0
):
raise PydanticCustomError(
"manager_agent_in_agents",
"Manager agent should not be included in agents list.",
{},
)
if self.manager_agent is not None:
if self.agents.count(self.manager_agent) > 0:
raise PydanticCustomError(
"manager_agent_in_agents",
"Manager agent should not be included in agents list.",
{},
)
# Set manager agent's crew and tools
self.manager_agent.crew = self
self.manager_agent.tools = AgentTools(agents=self.agents).tools()
return self
@@ -688,29 +694,40 @@ class Crew(BaseModel):
return self._execute_tasks(self.tasks)
def _create_manager_agent(self):
"""Creates and configures the manager agent for hierarchical process."""
i18n = I18N(prompt_file=self.prompt_file)
if self.manager_agent is not None:
self.manager_agent.allow_delegation = True
manager = self.manager_agent
if manager.tools is not None and len(manager.tools) > 0:
self._logger.log(
"warning", "Manager agent should not have tools", color="orange"
)
manager.tools = []
raise Exception("Manager agent should not have tools")
else:
self.manager_llm = create_llm(self.manager_llm)
manager = Agent(
role=i18n.retrieve("hierarchical_manager_agent", "role"),
goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
tools=AgentTools(agents=self.agents).tools(),
tools=[], # Initialize with no tools
allow_delegation=True,
llm=self.manager_llm,
verbose=self.verbose,
)
self.manager_agent = manager
# Configure manager agent
manager.allow_delegation = True
manager.crew = self
try:
delegation_tools = AgentTools(agents=self.agents).tools()
manager.tools = delegation_tools
self._logger.log(
"info",
f"Manager agent has delegation tools: {[tool.name for tool in manager.tools]}",
color="blue",
)
except Exception as e:
self._logger.log("error", f"Failed to set manager tools: {str(e)}", color="red")
raise ValueError(f"Failed to set manager tools: {str(e)}")
return manager
def _execute_tasks(
self,
@@ -722,7 +739,8 @@ class Crew(BaseModel):
Args:
tasks (List[Task]): List of tasks to execute
manager (Optional[BaseAgent], optional): Manager agent to use for delegation. Defaults to None.
start_index (Optional[int]): Index to start execution from
was_replayed (bool): Whether this is a replay execution
Returns:
CrewOutput: Final output of the crew
@@ -748,9 +766,59 @@ class Crew(BaseModel):
f"No agent available for task: {task.description}. Ensure that either the task has an assigned agent or a manager agent is provided."
)
# Determine which tools to use - task tools take precedence over agent tools
tools_for_task = task.tools or agent_to_use.tools or []
tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task)
# In hierarchical process, if task has no agent, assign it to the first available agent
if self.process == Process.hierarchical and not task.agent and len(self.agents) > 0:
task.agent = self.agents[0]
self._logger.log(
"debug",
f"Assigning task {task_index} to agent {task.agent.role}",
color="blue",
)
# Track delegation in hierarchical process
if (
self.process == Process.hierarchical
and task.agent
and task.agent != self.manager_agent
and self.manager_agent
):
task.increment_delegations(self.manager_agent.role)
task.processed_by_agents.add(self.manager_agent.role)
self._logger.log(
"debug",
f"Tracking delegation for task {task_index} from {task.agent.role} to {self.manager_agent.role}",
color="blue",
)
# Determine which tools to use
tools_for_task = []
# Get delegation tools if agent allows delegation or is in sequential process
delegation_tools = []
if agent_to_use.allow_delegation or (self.process == Process.sequential and len(self.agents) > 1):
if self.process == Process.hierarchical:
delegation_tools = self._update_manager_tools(task, [])
else:
delegation_tools = self._add_delegation_tools(task, [])
if delegation_tools:
tools_for_task.extend(delegation_tools)
self._logger.log(
"debug",
f"Added delegation tools for agent {agent_to_use.role}: {[tool.name for tool in delegation_tools]}",
color="blue",
)
# Add task tools if present, otherwise use agent tools
if task.tools:
tools_for_task.extend([
tool for tool in task.tools
if not any(dtool.name == tool.name for dtool in tools_for_task)
])
elif agent_to_use.tools:
tools_for_task.extend([
tool for tool in agent_to_use.tools
if not any(dtool.name == tool.name for dtool in tools_for_task)
])
self._log_task_start(task, agent_to_use.role)
@@ -821,74 +889,205 @@ class Crew(BaseModel):
def _prepare_tools(
self, agent: BaseAgent, task: Task, tools: List[Tool]
) -> List[Tool]:
# Add delegation tools if agent allows delegation
"""Prepare tools for an agent to use in a task.
Args:
agent: The agent that will use the tools
task: The task being executed
tools: The initial set of tools
Returns:
List[Tool]: The final set of tools for the agent to use
"""
# For manager agent, only use delegation tools
if agent == self.manager_agent:
return self._update_manager_tools(task, [])
# Get delegation tools first if agent allows delegation
delegation_tools = []
if agent.allow_delegation:
if self.process == Process.hierarchical:
if self.manager_agent:
tools = self._update_manager_tools(task, tools)
delegation_tools = self._update_manager_tools(task, [])
else:
raise ValueError(
"Manager agent is required for hierarchical process."
)
else:
delegation_tools = self._add_delegation_tools(task, [])
elif agent and agent.allow_delegation:
tools = self._add_delegation_tools(task, tools)
# Start with task tools if present, otherwise use agent tools
final_tools = []
if task.tools:
final_tools.extend(task.tools)
elif agent.tools:
final_tools.extend(agent.tools)
# Add code execution tools if agent allows code execution
if agent.allow_code_execution:
tools = self._add_code_execution_tools(agent, tools)
code_tools = agent.get_code_execution_tools()
if code_tools:
final_tools.extend(code_tools)
self._logger.log(
"debug",
f"Added code execution tools for agent {agent.role}: {[tool.name for tool in code_tools]}",
color="blue",
)
# Add multimodal tools if agent supports them
if agent and agent.multimodal:
tools = self._add_multimodal_tools(agent, tools)
multimodal_tools = agent.get_multimodal_tools()
if multimodal_tools:
final_tools.extend(multimodal_tools)
self._logger.log(
"debug",
f"Added multimodal tools for agent {agent.role}: {[tool.name for tool in multimodal_tools]}",
color="blue",
)
return tools
# Always add delegation tools if agent allows delegation
if delegation_tools:
# Add delegation tools to the beginning
final_tools = delegation_tools + [
tool for tool in final_tools
if not any(dtool.name == tool.name for dtool in delegation_tools)
]
return final_tools
def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]:
"""Get the agent that should execute the task.
In hierarchical process, all tasks are delegated through the manager agent.
In sequential process, tasks are executed by their assigned agent.
"""
if self.process == Process.hierarchical:
return self.manager_agent
return task.agent
# In sequential process, if agent allows delegation and there are other agents,
# we track delegation to the first available agent
agent_to_use = task.agent
if agent_to_use and agent_to_use.allow_delegation and len(self.agents) > 1:
for other_agent in self.agents:
if other_agent != agent_to_use:
task.increment_delegations(other_agent.role)
agent_to_use = other_agent
break
return agent_to_use
def _merge_tools(
self, existing_tools: List[Tool], new_tools: List[Tool]
) -> List[Tool]:
"""Merge new tools into existing tools list, avoiding duplicates by tool name."""
"""Merge new tools into existing tools list, preserving delegation tools."""
if not new_tools:
return existing_tools
# Keep track of delegation tools
delegation_tools = [
tool for tool in existing_tools
if tool.name in {DELEGATE_WORK_TOOL, ASK_QUESTION_TOOL}
]
# Create mapping of tool names to new tools
new_tool_map = {tool.name: tool for tool in new_tools}
# Remove any existing tools that will be replaced
tools = [tool for tool in existing_tools if tool.name not in new_tool_map]
tools = [
tool for tool in existing_tools
if tool.name not in new_tool_map and tool not in delegation_tools
]
# Add all new tools
tools.extend(new_tools)
# Add back delegation tools
tools.extend(delegation_tools)
return tools
def _inject_delegation_tools(
self, tools: List[Tool], task_agent: BaseAgent, agents: List[BaseAgent]
):
delegation_tools = task_agent.get_delegation_tools(agents)
if task_agent == self.manager_agent:
return delegation_tools
return self._merge_tools(tools, delegation_tools)
def _add_multimodal_tools(self, agent: BaseAgent, tools: List[Tool]):
def _add_multimodal_tools(self, agent: BaseAgent, tools: List[Tool]) -> List[Tool]:
"""Add multimodal tools for an agent.
Args:
agent: The agent that will use the tools
tools: The current set of tools
Returns:
List[Tool]: The tools with multimodal tools added
"""
multimodal_tools = agent.get_multimodal_tools()
return self._merge_tools(tools, multimodal_tools)
def _add_code_execution_tools(self, agent: BaseAgent, tools: List[Tool]):
code_tools = agent.get_code_execution_tools()
return self._merge_tools(tools, code_tools)
def _add_delegation_tools(self, task: Task, tools: List[Tool]):
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
if not tools:
tools = []
tools = self._inject_delegation_tools(
tools, task.agent, agents_for_delegation
if multimodal_tools:
self._logger.log(
"debug",
f"Adding multimodal tools for agent {agent.role}: {[tool.name for tool in multimodal_tools]}",
color="blue",
)
return tools
return multimodal_tools
return []
def _add_code_execution_tools(self, agent: BaseAgent, tools: List[Tool]) -> List[Tool]:
"""Add code execution tools for an agent.
Args:
agent: The agent that will use the tools
tools: The current set of tools
Returns:
List[Tool]: The tools with code execution tools added
"""
code_tools = agent.get_code_execution_tools()
if code_tools:
self._logger.log(
"debug",
f"Adding code execution tools for agent {agent.role}: {[tool.name for tool in code_tools]}",
color="blue",
)
return code_tools
return []
def _add_delegation_tools(self, task: Task, tools: List[Tool]) -> List[Tool]:
"""Add delegation tools for the task's agent.
Args:
task: The task being executed
tools: The current set of tools
Returns:
List[Tool]: The tools with delegation tools added
"""
if task.agent and task.agent.allow_delegation:
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
if len(agents_for_delegation) > 0:
delegation_tools = self._inject_delegation_tools([], task.agent, agents_for_delegation)
if delegation_tools:
self._logger.log(
"debug",
f"Adding delegation tools for agent {task.agent.role}: {[tool.name for tool in delegation_tools]}",
color="blue",
)
return delegation_tools
elif task.agent and task.agent.crew and task.agent.crew.process == Process.sequential:
# In sequential process, if agent has crew but no delegation tools, add them
agents_for_delegation = [agent for agent in task.agent.crew.agents if agent != task.agent]
if len(agents_for_delegation) > 0:
delegation_tools = self._inject_delegation_tools([], task.agent, agents_for_delegation)
if delegation_tools:
self._logger.log(
"debug",
f"Adding delegation tools for agent {task.agent.role} in sequential process: {[tool.name for tool in delegation_tools]}",
color="blue",
)
return delegation_tools
return []
def _log_task_start(self, task: Task, role: str = "None"):
if self.output_log_file:
@@ -899,10 +1098,10 @@ class Crew(BaseModel):
def _update_manager_tools(self, task: Task, tools: List[Tool]):
if self.manager_agent:
if task.agent:
tools = self._inject_delegation_tools(tools, task.agent, [task.agent])
tools = self._inject_delegation_tools([], task.agent, [task.agent])
else:
tools = self._inject_delegation_tools(
tools, self.manager_agent, self.agents
[], self.manager_agent, self.agents
)
return tools

View File

@@ -358,6 +358,8 @@ class Task(BaseModel):
self.start_time = datetime.datetime.now()
self._execution_span = self._telemetry.task_started(crew=agent.crew, task=self)
# Track delegation is now handled in _execute_tasks to ensure proper counting
self.prompt_context = context
tools = tools or self.tools or []

View File

@@ -1,10 +1,14 @@
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.base_tool import BaseTool
from crewai.utilities import I18N
from crewai.utilities import I18N, Logger
from .ask_question_tool import AskQuestionTool
from .delegate_work_tool import DelegateWorkTool
# Tool name constants
DELEGATE_WORK_TOOL = "Delegate Work"
ASK_QUESTION_TOOL = "Ask Question"
class AgentTools:
"""Manager class for agent-related tools"""
@@ -12,21 +16,28 @@ class AgentTools:
def __init__(self, agents: list[BaseAgent], i18n: I18N = I18N()):
self.agents = agents
self.i18n = i18n
self._logger = Logger()
def tools(self) -> list[BaseTool]:
"""Get all available agent tools"""
coworkers = ", ".join([f"{agent.role}" for agent in self.agents])
self._logger.log(
"debug", f"Creating delegation tools for agents: {coworkers}", color="blue"
)
delegate_tool = DelegateWorkTool(
agents=self.agents,
i18n=self.i18n,
description=self.i18n.tools("delegate_work").format(coworkers=coworkers), # type: ignore
name=DELEGATE_WORK_TOOL, # Using constant for consistency
)
ask_tool = AskQuestionTool(
agents=self.agents,
i18n=self.i18n,
description=self.i18n.tools("ask_question").format(coworkers=coworkers), # type: ignore
name=ASK_QUESTION_TOOL, # Using constant for consistency
)
return [delegate_tool, ask_tool]

View File

@@ -22,6 +22,7 @@ from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.tools.base_tool import BaseTool
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import Logger
from crewai.utilities.rpm_controller import RPMController
@@ -350,6 +351,42 @@ def test_manager_llm_requirement_for_hierarchical_process():
tasks=[task],
)
class TestTool(BaseTool):
"""Test tool for validation."""
name: str = "test_tool"
description: str = "A test tool"
def _run(self, *args, **kwargs):
return "test output"
def test_manager_agent_tools_validation():
"""Test that manager agent only has delegation tools."""
task = Task(
description="Test task",
expected_output="Test output",
)
# Test with manager_agent having non-delegation tools
manager = Agent(
role="Manager",
goal="Manage tasks",
backstory="Test manager",
tools=[TestTool()],
)
crew = Crew(
agents=[researcher, writer],
process=Process.hierarchical,
manager_agent=manager,
tasks=[task],
)
# Verify manager agent has only delegation tools
assert crew.manager_agent.tools is not None
assert len(crew.manager_agent.tools) == 2
tool_names = {tool.name for tool in crew.manager_agent.tools}
assert tool_names == {"Delegate Work", "Ask Question"}
@pytest.mark.vcr(filter_headers=["authorization"])
def test_manager_agent_delegating_to_assigned_task_agent():