This commit is contained in:
Brandon Hancock
2025-03-13 10:23:09 -04:00
parent 41a670166a
commit f4186fad14
7 changed files with 474 additions and 32 deletions

View File

@@ -49,6 +49,7 @@ class Agent(BaseAgent):
max_rpm: Maximum number of requests per minute for the agent execution to be respected.
verbose: Whether the agent execution should be in verbose mode.
allow_delegation: Whether the agent is allowed to delegate tasks to other agents.
delegate_to: List of agents this agent can delegate to. If None and allow_delegation is True, can delegate to all agents.
tools: Tools at agents disposal
step_callback: Callback to be executed after each step of the agent execution.
knowledge_sources: Knowledge sources for the agent.
@@ -341,10 +342,16 @@ class Agent(BaseAgent):
callbacks=[TokenCalcHandler(self._token_process)],
)
def get_delegation_tools(self, agents: List[BaseAgent]):
agent_tools = AgentTools(agents=agents)
tools = agent_tools.tools()
return tools
def get_delegation_tools(self, agents: Sequence[BaseAgent]):
# If delegate_to is specified, use those agents instead of all agents
if self.delegate_to is not None:
agents_to_use = self.delegate_to
else:
agents_to_use = agents
agent_tools = AgentTools(agents=agents_to_use)
delegation_tools = agent_tools.tools()
return delegation_tools
def get_multimodal_tools(self) -> Sequence[BaseTool]:
from crewai.tools.agent_tools.add_image_tool import AddImageTool

View File

@@ -2,7 +2,7 @@ import uuid
from abc import ABC, abstractmethod
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Dict, List, Optional, TypeVar
from typing import Any, Dict, List, Optional, Sequence, TypeVar
from pydantic import (
UUID4,
@@ -41,6 +41,7 @@ class BaseAgent(ABC, BaseModel):
verbose (bool): Verbose mode for the Agent Execution.
max_rpm (Optional[int]): Maximum number of requests per minute for the agent execution.
allow_delegation (bool): Allow delegation of tasks to agents.
delegate_to (Optional[List["BaseAgent"]]): List of agents this agent can delegate to. If None and allow_delegation is True, can delegate to all agents.
tools (Optional[List[Any]]): Tools at the agent's disposal.
max_iter (int): Maximum iterations for an agent to execute a task.
agent_executor (InstanceOf): An instance of the CrewAgentExecutor class.
@@ -61,7 +62,7 @@ class BaseAgent(ABC, BaseModel):
Abstract method to create an agent executor.
_parse_tools(tools: List[BaseTool]) -> List[Any]:
Abstract method to parse tools.
get_delegation_tools(agents: List["BaseAgent"]):
get_delegation_tools(agents: Sequence["BaseAgent"]) -> List[BaseTool]:
Abstract method to set the agents task tools for handling delegation and question asking to other agents in crew.
get_output_converter(llm, model, instructions):
Abstract method to get the converter class for the agent to create json/pydantic outputs.
@@ -111,6 +112,10 @@ class BaseAgent(ABC, BaseModel):
default=False,
description="Enable agent to delegate and ask questions among each other.",
)
delegate_to: Optional[List["BaseAgent"]] = Field(
default=None,
description="List of agents this agent can delegate to. If None and allow_delegation is True, can delegate to all agents.",
)
tools: Optional[List[BaseTool]] = Field(
default_factory=list, description="Tools at agents' disposal"
)
@@ -248,7 +253,7 @@ class BaseAgent(ABC, BaseModel):
pass
@abstractmethod
def get_delegation_tools(self, agents: List["BaseAgent"]) -> List[BaseTool]:
def get_delegation_tools(self, agents: Sequence["BaseAgent"]) -> List[BaseTool]:
"""Set the task tools that init BaseAgenTools class."""
pass
@@ -275,6 +280,7 @@ class BaseAgent(ABC, BaseModel):
"knowledge_sources",
"knowledge_storage",
"knowledge",
"delegate_to",
}
# Copy llm
@@ -300,6 +306,11 @@ class BaseAgent(ABC, BaseModel):
copied_source.storage = shared_storage
existing_knowledge_sources.append(copied_source)
# Copy delegate_to if it exists
existing_delegate_to = None
if self.delegate_to:
existing_delegate_to = list(self.delegate_to)
copied_data = self.model_dump(exclude=exclude)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
copied_agent = type(self)(
@@ -309,6 +320,7 @@ class BaseAgent(ABC, BaseModel):
knowledge_sources=existing_knowledge_sources,
knowledge=copied_knowledge,
knowledge_storage=copied_knowledge_storage,
delegate_to=existing_delegate_to,
)
return copied_agent

View File

@@ -739,21 +739,25 @@ class Crew(BaseModel):
i18n = I18N(prompt_file=self.prompt_file)
if self.manager_agent is not None:
self.manager_agent.allow_delegation = True
# Set the delegate_to property to all agents in the crew
self.manager_agent.delegate_to = self.agents
manager = self.manager_agent
if manager.tools is not None and len(manager.tools) > 0:
self._logger.log(
"warning", "Manager agent should not have tools", color="orange"
)
manager.tools = []
raise Exception("Manager agent should not have tools")
# Instead, we ensure it has delegation tools
if not manager.allow_delegation:
manager.allow_delegation = True
else:
self.manager_llm = create_llm(self.manager_llm)
# Create delegation tools
delegation_tools = AgentTools(agents=self.agents).tools()
manager = Agent(
role=i18n.retrieve("hierarchical_manager_agent", "role"),
goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
tools=AgentTools(agents=self.agents).tools(),
tools=delegation_tools,
allow_delegation=True,
delegate_to=self.agents,
llm=self.manager_llm,
verbose=self.verbose,
)
@@ -929,7 +933,15 @@ class Crew(BaseModel):
return self._merge_tools(tools, code_tools)
def _add_delegation_tools(self, task: Task, tools: List[Tool]):
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
# If the agent has specific agents to delegate to, use those
if task.agent and task.agent.delegate_to is not None:
agents_for_delegation = task.agent.delegate_to
else:
# Otherwise use all agents except the current one
agents_for_delegation = [
agent for agent in self.agents if agent != task.agent
]
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
if not tools:
tools = []

View File

@@ -1,3 +1,5 @@
from typing import Sequence
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.base_tool import BaseTool
from crewai.utilities import I18N
@@ -9,7 +11,7 @@ from .delegate_work_tool import DelegateWorkTool
class AgentTools:
"""Manager class for agent-related tools"""
def __init__(self, agents: list[BaseAgent], i18n: I18N = I18N()):
def __init__(self, agents: Sequence[BaseAgent], i18n: I18N = I18N()):
self.agents = agents
self.i18n = i18n

View File

@@ -1,5 +1,5 @@
import logging
from typing import Optional
from typing import Optional, Sequence
from pydantic import Field
@@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
class BaseAgentTool(BaseTool):
"""Base class for agent-related tools"""
agents: list[BaseAgent] = Field(description="List of available agents")
agents: Sequence[BaseAgent] = Field(description="List of available agents")
i18n: I18N = Field(
default_factory=I18N, description="Internationalization settings"
)
@@ -47,10 +47,7 @@ class BaseAgentTool(BaseTool):
return coworker
def _execute(
self,
agent_name: Optional[str],
task: str,
context: Optional[str] = None
self, agent_name: Optional[str], task: str, context: Optional[str] = None
) -> str:
"""
Execute delegation to an agent with case-insensitive and whitespace-tolerant matching.
@@ -77,33 +74,43 @@ class BaseAgentTool(BaseTool):
# when it should look like this:
# {"task": "....", "coworker": "...."}
sanitized_name = self.sanitize_agent_name(agent_name)
logger.debug(f"Sanitized agent name from '{agent_name}' to '{sanitized_name}'")
logger.debug(
f"Sanitized agent name from '{agent_name}' to '{sanitized_name}'"
)
available_agents = [agent.role for agent in self.agents]
logger.debug(f"Available agents: {available_agents}")
agent = [ # type: ignore # Incompatible types in assignment (expression has type "list[BaseAgent]", variable has type "str | None")
agent = [ # type: ignore # Incompatible types in assignment (expression has type "Sequence[BaseAgent]", variable has type "str | None")
available_agent
for available_agent in self.agents
if self.sanitize_agent_name(available_agent.role) == sanitized_name
]
logger.debug(f"Found {len(agent)} matching agents for role '{sanitized_name}'")
logger.debug(
f"Found {len(agent)} matching agents for role '{sanitized_name}'"
)
except (AttributeError, ValueError) as e:
# Handle specific exceptions that might occur during role name processing
return self.i18n.errors("agent_tool_unexisting_coworker").format(
coworkers="\n".join(
[f"- {self.sanitize_agent_name(agent.role)}" for agent in self.agents]
[
f"- {self.sanitize_agent_name(agent.role)}"
for agent in self.agents
]
),
error=str(e)
error=str(e),
)
if not agent:
# No matching agent found after sanitization
return self.i18n.errors("agent_tool_unexisting_coworker").format(
coworkers="\n".join(
[f"- {self.sanitize_agent_name(agent.role)}" for agent in self.agents]
[
f"- {self.sanitize_agent_name(agent.role)}"
for agent in self.agents
]
),
error=f"No agent found with role '{sanitized_name}'"
error=f"No agent found with role '{sanitized_name}'",
)
agent = agent[0]
@@ -114,11 +121,12 @@ class BaseAgentTool(BaseTool):
expected_output=agent.i18n.slice("manager_request"),
i18n=agent.i18n,
)
logger.debug(f"Created task for agent '{self.sanitize_agent_name(agent.role)}': {task}")
logger.debug(
f"Created task for agent '{self.sanitize_agent_name(agent.role)}': {task}"
)
return agent.execute_task(task_with_assigned_agent, context)
except Exception as e:
# Handle task creation or execution errors
return self.i18n.errors("agent_tool_execution_error").format(
agent_role=self.sanitize_agent_name(agent.role),
error=str(e)
agent_role=self.sanitize_agent_name(agent.role), error=str(e)
)

View File

@@ -1797,3 +1797,169 @@ def test_litellm_anthropic_error_handling():
# Verify the LLM call was only made once (no retries)
mock_llm_call.assert_called_once()
@pytest.mark.vcr(filter_headers=["authorization"])
def test_agent_delegation_to_specific_agents():
"""Test that an agent can delegate to specific agents using the delegate_to property."""
# Create agents in order so we can reference them in delegate_to
agent2 = Agent(
role="Agent 2",
goal="Goal for Agent 2",
backstory="Backstory for Agent 2",
allow_delegation=True,
)
agent3 = Agent(
role="Agent 3",
goal="Goal for Agent 3",
backstory="Backstory for Agent 3",
allow_delegation=True,
)
# Create agent1 without specific delegation first to test default behavior
agent1 = Agent(
role="Agent 1",
goal="Goal for Agent 1",
backstory="Backstory for Agent 1",
allow_delegation=True,
)
# Test default behavior (delegate to all agents)
all_agents = [agent1, agent2, agent3]
delegation_tools = agent1.get_delegation_tools(all_agents)
# Verify that tools for all agents are returned
assert len(delegation_tools) == 2 # Delegate and Ask tools
# Check that the tools can delegate to all agents
delegate_tool = delegation_tools[0]
ask_tool = delegation_tools[1]
# Verify the tools description includes all agents
assert "Agent 1" in delegate_tool.description
assert "Agent 2" in delegate_tool.description
assert "Agent 3" in delegate_tool.description
assert "Agent 1" in ask_tool.description
assert "Agent 2" in ask_tool.description
assert "Agent 3" in ask_tool.description
# Test delegation to specific agents by creating a new agent with delegate_to
agent1_with_specific_delegation = Agent(
role="Agent 1",
goal="Goal for Agent 1",
backstory="Backstory for Agent 1",
allow_delegation=True,
delegate_to=[agent2], # Only delegate to agent2
)
specific_delegation_tools = agent1_with_specific_delegation.get_delegation_tools(
all_agents
)
# Verify that tools for only the specified agent are returned
assert len(specific_delegation_tools) == 2 # Delegate and Ask tools
# Check that the tools can only delegate to agent2
specific_delegate_tool = specific_delegation_tools[0]
specific_ask_tool = specific_delegation_tools[1]
# Verify the tools description includes only agent2
assert "Agent 2" in specific_delegate_tool.description
assert "Agent 1" not in specific_delegate_tool.description
assert "Agent 3" not in specific_delegate_tool.description
assert "Agent 2" in specific_ask_tool.description
assert "Agent 1" not in specific_ask_tool.description
assert "Agent 3" not in specific_ask_tool.description
def test_agent_copy_with_delegate_to():
"""Test that the delegate_to property is properly copied when an agent is copied."""
# Create agents in order so we can reference them in delegate_to
agent2 = Agent(
role="Agent 2",
goal="Goal for Agent 2",
backstory="Backstory for Agent 2",
allow_delegation=True,
)
agent3 = Agent(
role="Agent 3",
goal="Goal for Agent 3",
backstory="Backstory for Agent 3",
allow_delegation=True,
)
# Create agent1 with delegate_to set during initialization
agent1 = Agent(
role="Agent 1",
goal="Goal for Agent 1",
backstory="Backstory for Agent 1",
allow_delegation=True,
delegate_to=[agent2, agent3],
)
# Copy agent1
agent1_copy = agent1.copy()
# Verify that delegate_to is properly copied
assert agent1_copy.delegate_to is not None
assert len(agent1_copy.delegate_to) == 2
# Verify that the copied delegate_to contains the same agents
delegate_roles = [agent.role for agent in agent1_copy.delegate_to]
assert "Agent 2" in delegate_roles
assert "Agent 3" in delegate_roles
# Verify that modifying the original agent's delegate_to doesn't affect the copy
agent1.delegate_to = [agent2]
assert len(agent1_copy.delegate_to) == 2
assert len(agent1.delegate_to) == 1
@pytest.mark.vcr(filter_headers=["authorization"])
def test_agent_delegation_to_all_agents():
"""Test that an agent with allow_delegation=True but without delegate_to specified can delegate to all agents."""
# Create three agents
agent1 = Agent(
role="Agent 1",
goal="Goal for Agent 1",
backstory="Backstory for Agent 1",
allow_delegation=True, # Allow delegation but don't specify delegate_to
)
agent2 = Agent(
role="Agent 2",
goal="Goal for Agent 2",
backstory="Backstory for Agent 2",
allow_delegation=True,
)
agent3 = Agent(
role="Agent 3",
goal="Goal for Agent 3",
backstory="Backstory for Agent 3",
allow_delegation=True,
)
# Get delegation tools for agent1
all_agents = [agent1, agent2, agent3]
delegation_tools = agent1.get_delegation_tools(all_agents)
# Verify that tools for all agents are returned
assert len(delegation_tools) == 2 # Delegate and Ask tools
# Check that the tools can delegate to all agents
delegate_tool = delegation_tools[0]
ask_tool = delegation_tools[1]
# Verify the tools description includes all agents
assert "Agent 1" in delegate_tool.description
assert "Agent 2" in delegate_tool.description
assert "Agent 3" in delegate_tool.description
assert "Agent 1" in ask_tool.description
assert "Agent 2" in ask_tool.description
assert "Agent 3" in ask_tool.description
# Verify that agent1.delegate_to is None
assert agent1.delegate_to is None

View File

@@ -4021,3 +4021,238 @@ def test_crew_with_knowledge_sources_works_with_copy():
assert len(crew_copy.tasks) == len(crew.tasks)
assert len(crew_copy.tasks) == len(crew.tasks)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_with_specific_delegation():
"""Test that agents in a crew can delegate to specific agents using the delegate_to property."""
# Create editor agent first since it will be referenced in writer's delegate_to
editor = Agent(
role="Editor",
goal="Edit content",
backstory="You're an expert editor",
allow_delegation=True,
)
# Create writer with delegate_to set during initialization
writer = Agent(
role="Writer",
goal="Write content",
backstory="You're an expert writer",
allow_delegation=True,
delegate_to=[editor], # Writer can only delegate to Editor
)
# Create researcher with delegate_to set during initialization
researcher = Agent(
role="Researcher",
goal="Research information",
backstory="You're an expert researcher",
allow_delegation=True,
delegate_to=[writer], # Researcher can only delegate to Writer
)
# Create tasks
task1 = Task(
description="Research a topic",
expected_output="Research results",
agent=researcher,
)
task2 = Task(
description="Write an article",
expected_output="Written article",
agent=writer,
)
# Create crew
crew = Crew(
agents=[researcher, writer, editor],
tasks=[task1, task2],
)
# Test that the _add_delegation_tools method respects the delegate_to property
tools = []
tools_with_delegation = crew._add_delegation_tools(task1, tools)
# Verify that delegation tools are added
assert len(tools_with_delegation) > 0
# Find the delegation tool
delegate_tool = None
for tool in tools_with_delegation:
if "Delegate" in tool.name:
delegate_tool = tool
break
assert delegate_tool is not None
# Verify that the delegation tool only includes the writer
assert "Writer" in delegate_tool.description
assert "Editor" not in delegate_tool.description
assert "Researcher" not in delegate_tool.description
# Test delegation for the writer
tools = []
tools_with_delegation = crew._add_delegation_tools(task2, tools)
# Find the delegation tool
delegate_tool = None
for tool in tools_with_delegation:
if "Delegate" in tool.name:
delegate_tool = tool
break
assert delegate_tool is not None
# Verify that the delegation tool only includes the editor
assert "Editor" in delegate_tool.description
assert "Writer" not in delegate_tool.description
assert "Researcher" not in delegate_tool.description
@pytest.mark.vcr(filter_headers=["authorization"])
def test_manager_agent_with_tools_and_delegation():
"""Test that a manager agent can have tools and still delegate to all agents."""
# Create a simple tool for the manager
class SimpleTestTool(BaseTool):
name: str = "Simple Test Tool"
description: str = "A simple test tool"
def _run(self) -> str:
return "Tool executed"
# Create agents
researcher = Agent(
role="Researcher",
goal="Research information",
backstory="You're an expert researcher",
)
writer = Agent(
role="Writer",
goal="Write content",
backstory="You're an expert writer",
)
# Create a manager agent with tools
manager = Agent(
role="Manager",
goal="Manage the team",
backstory="You're an expert manager",
tools=[SimpleTestTool()],
allow_delegation=True,
)
# Create a crew with the manager agent
crew = Crew(
agents=[researcher, writer],
manager_agent=manager,
process=Process.hierarchical,
)
# Verify that the manager agent has tools
assert len(manager.tools) == 1
assert manager.tools[0].name == "Simple Test Tool"
# Verify that the manager agent can delegate to all agents
assert manager.allow_delegation is True
assert manager.delegate_to == crew.agents
# Create a task
task = Task(
description="Complete a project",
expected_output="Project completed",
)
# Create a crew with the task
crew = Crew(
agents=[researcher, writer],
manager_agent=manager,
tasks=[task],
process=Process.hierarchical,
)
# Mock the execute_task method to avoid actual execution
with patch.object(Agent, "execute_task", return_value="Task executed"):
# Run the crew
result = crew.kickoff()
# Verify that the result is as expected
assert result.raw == "Task executed"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_with_default_delegation():
"""Test that an agent with allow_delegation=True but without delegate_to specified can delegate to all agents in the crew."""
# Create agents
researcher = Agent(
role="Researcher",
goal="Research information",
backstory="You're an expert researcher",
allow_delegation=True, # Allow delegation but don't specify delegate_to
)
writer = Agent(
role="Writer",
goal="Write content",
backstory="You're an expert writer",
allow_delegation=True, # Allow delegation but don't specify delegate_to
)
editor = Agent(
role="Editor",
goal="Edit content",
backstory="You're an expert editor",
allow_delegation=True, # Allow delegation but don't specify delegate_to
)
# Create tasks
task1 = Task(
description="Research a topic",
expected_output="Research results",
agent=researcher,
)
task2 = Task(
description="Write content based on research",
expected_output="Written content",
agent=writer,
)
task3 = Task(
description="Edit the content",
expected_output="Edited content",
agent=editor,
)
# Create crew
crew = Crew(
agents=[researcher, writer, editor],
tasks=[task1, task2, task3],
)
# Verify that all agents have allow_delegation=True
for agent in crew.agents:
assert agent.allow_delegation is True
# Verify that delegate_to is None (default delegation to all)
assert agent.delegate_to is None
# Get delegation tools for researcher
delegation_tools = researcher.get_delegation_tools(crew.agents)
# Verify that tools for all agents are returned
assert len(delegation_tools) == 2 # Delegate and Ask tools
# Check that the tools can delegate to all agents
delegate_tool = delegation_tools[0]
ask_tool = delegation_tools[1]
# Verify the tools description includes all agents
assert "Researcher" in delegate_tool.description
assert "Writer" in delegate_tool.description
assert "Editor" in delegate_tool.description
assert "Researcher" in ask_tool.description
assert "Writer" in ask_tool.description
assert "Editor" in ask_tool.description