Compare commits

...

11 Commits

Author SHA1 Message Date
Brandon Hancock (bhancock_ai)
a024f576b3 Merge branch 'main' into feat/agent-delegation-control 2025-03-14 11:04:35 -04:00
Brandon Hancock
f232f11ad9 getting ready 2025-03-14 10:31:18 -04:00
Brandon Hancock
c334feea7e Clean up tests 2025-03-14 10:09:21 -04:00
Brandon Hancock
4b6498de8b fix tests 2025-03-14 09:35:13 -04:00
Brandon Hancock
0a6098fb50 more fixes 2025-03-13 15:47:02 -04:00
Brandon Hancock
358befe2c1 wip 2025-03-13 15:45:11 -04:00
Brandon Hancock
cb86594f92 More sequences 2025-03-13 15:33:23 -04:00
Brandon Hancock
403890d8e8 wip 2025-03-13 11:02:28 -04:00
Brandon Hancock
bd27d03bc7 more test improvements 2025-03-13 10:38:18 -04:00
Brandon Hancock
3e563365a2 fix failing tests 2025-03-13 10:32:07 -04:00
Brandon Hancock
f4186fad14 wip 2025-03-13 10:23:09 -04:00
8 changed files with 708 additions and 61 deletions

View File

@@ -1,7 +1,7 @@
import re
import shutil
import subprocess
from typing import Any, Dict, List, Literal, Optional, Sequence, Union
from typing import Any, Dict, List, Literal, Optional, Sequence, Union, cast
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
@@ -50,6 +50,7 @@ class Agent(BaseAgent):
max_rpm: Maximum number of requests per minute for the agent execution to be respected.
verbose: Whether the agent execution should be in verbose mode.
allow_delegation: Whether the agent is allowed to delegate tasks to other agents.
delegate_to: List of agents this agent can delegate to. If None and allow_delegation is True, can delegate to all agents.
tools: Tools at agents disposal
step_callback: Callback to be executed after each step of the agent execution.
knowledge_sources: Knowledge sources for the agent.
@@ -342,10 +343,17 @@ class Agent(BaseAgent):
callbacks=[TokenCalcHandler(self._token_process)],
)
def get_delegation_tools(self, agents: List[BaseAgent]):
agent_tools = AgentTools(agents=agents)
tools = agent_tools.tools()
return tools
def get_delegation_tools(self, agents: Sequence[BaseAgent]) -> Sequence[BaseTool]:
# If delegate_to is specified, use those agents instead of all agents
agents_to_use: List[BaseAgent]
if self.delegate_to is not None:
agents_to_use = cast(List[BaseAgent], list(self.delegate_to))
else:
agents_to_use = list(agents) # Convert to list to match expected type
agent_tools = AgentTools(agents=agents_to_use)
delegation_tools = agent_tools.tools()
return delegation_tools
def get_multimodal_tools(self) -> Sequence[BaseTool]:
from crewai.tools.agent_tools.add_image_tool import AddImageTool

View File

@@ -2,7 +2,7 @@ import uuid
from abc import ABC, abstractmethod
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Dict, List, Optional, TypeVar
from typing import Any, Dict, List, Optional, Sequence, TypeVar
from pydantic import (
UUID4,
@@ -42,6 +42,7 @@ class BaseAgent(ABC, BaseModel):
verbose (bool): Verbose mode for the Agent Execution.
max_rpm (Optional[int]): Maximum number of requests per minute for the agent execution.
allow_delegation (bool): Allow delegation of tasks to agents.
delegate_to (Optional[List["BaseAgent"]]): List of agents this agent can delegate to. If None and allow_delegation is True, can delegate to all agents.
tools (Optional[List[Any]]): Tools at the agent's disposal.
max_iter (int): Maximum iterations for an agent to execute a task.
agent_executor (InstanceOf): An instance of the CrewAgentExecutor class.
@@ -63,7 +64,7 @@ class BaseAgent(ABC, BaseModel):
Abstract method to create an agent executor.
_parse_tools(tools: List[BaseTool]) -> List[Any]:
Abstract method to parse tools.
get_delegation_tools(agents: List["BaseAgent"]):
get_delegation_tools(agents: Sequence["BaseAgent"]) -> Sequence[BaseTool]:
Abstract method to set the agents task tools for handling delegation and question asking to other agents in crew.
get_output_converter(llm, model, instructions):
Abstract method to get the converter class for the agent to create json/pydantic outputs.
@@ -113,6 +114,10 @@ class BaseAgent(ABC, BaseModel):
default=False,
description="Enable agent to delegate and ask questions among each other.",
)
delegate_to: Optional[List["BaseAgent"]] = Field(
default=None,
description="List of agents this agent can delegate to. If None and allow_delegation is True, can delegate to all agents.",
)
tools: Optional[List[BaseTool]] = Field(
default_factory=list, description="Tools at agents' disposal"
)
@@ -258,7 +263,7 @@ class BaseAgent(ABC, BaseModel):
pass
@abstractmethod
def get_delegation_tools(self, agents: List["BaseAgent"]) -> List[BaseTool]:
def get_delegation_tools(self, agents: Sequence["BaseAgent"]) -> Sequence[BaseTool]:
"""Set the task tools that init BaseAgenTools class."""
pass
@@ -285,6 +290,7 @@ class BaseAgent(ABC, BaseModel):
"knowledge_sources",
"knowledge_storage",
"knowledge",
"delegate_to",
}
# Copy llm
@@ -310,6 +316,10 @@ class BaseAgent(ABC, BaseModel):
copied_source.storage = shared_storage
existing_knowledge_sources.append(copied_source)
existing_delegate_to = None
if self.delegate_to:
existing_delegate_to = list(self.delegate_to)
copied_data = self.model_dump(exclude=exclude)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
copied_agent = type(self)(
@@ -319,6 +329,7 @@ class BaseAgent(ABC, BaseModel):
knowledge_sources=existing_knowledge_sources,
knowledge=copied_knowledge,
knowledge_storage=copied_knowledge_storage,
delegate_to=existing_delegate_to,
)
return copied_agent

View File

@@ -6,7 +6,7 @@ import warnings
from concurrent.futures import Future
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Union
from pydantic import (
UUID4,
@@ -36,6 +36,7 @@ from crewai.security import Fingerprint, SecurityConfig
from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import Tool
from crewai.types.usage_metrics import UsageMetrics
@@ -759,22 +760,27 @@ class Crew(BaseModel):
def _create_manager_agent(self):
i18n = I18N(prompt_file=self.prompt_file)
if self.manager_agent is not None:
# Ensure delegation is enabled for the manager agent
self.manager_agent.allow_delegation = True
# Set the delegate_to property to all agents in the crew
# If delegate_to is already set, it will be used instead of all agents
if self.manager_agent.delegate_to is None:
self.manager_agent.delegate_to = self.agents
manager = self.manager_agent
if manager.tools is not None and len(manager.tools) > 0:
self._logger.log(
"warning", "Manager agent should not have tools", color="orange"
)
manager.tools = []
raise Exception("Manager agent should not have tools")
else:
self.manager_llm = create_llm(self.manager_llm)
# Create delegation tools
delegation_tools = AgentTools(agents=self.agents).tools()
manager = Agent(
role=i18n.retrieve("hierarchical_manager_agent", "role"),
goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
tools=AgentTools(agents=self.agents).tools(),
tools=delegation_tools,
allow_delegation=True,
delegate_to=self.agents,
llm=self.manager_llm,
verbose=self.verbose,
)
@@ -818,8 +824,8 @@ class Crew(BaseModel):
)
# Determine which tools to use - task tools take precedence over agent tools
tools_for_task = task.tools or agent_to_use.tools or []
tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task)
initial_tools = task.tools or agent_to_use.tools or []
prepared_tools = self._prepare_tools(agent_to_use, task, initial_tools)
self._log_task_start(task, agent_to_use.role)
@@ -838,7 +844,7 @@ class Crew(BaseModel):
future = task.execute_async(
agent=agent_to_use,
context=context,
tools=tools_for_task,
tools=prepared_tools,
)
futures.append((task, future, task_index))
else:
@@ -850,7 +856,7 @@ class Crew(BaseModel):
task_output = task.execute_sync(
agent=agent_to_use,
context=context,
tools=tools_for_task,
tools=prepared_tools,
)
task_outputs.append(task_output)
self._process_task_result(task, task_output)
@@ -888,8 +894,8 @@ class Crew(BaseModel):
return None
def _prepare_tools(
self, agent: BaseAgent, task: Task, tools: List[Tool]
) -> List[Tool]:
self, agent: BaseAgent, task: Task, tools: Sequence[BaseTool]
) -> list[BaseTool]:
# Add delegation tools if agent allows delegation
if agent.allow_delegation:
if self.process == Process.hierarchical:
@@ -904,13 +910,15 @@ class Crew(BaseModel):
tools = self._add_delegation_tools(task, tools)
# Add code execution tools if agent allows code execution
if agent.allow_code_execution:
if hasattr(agent, "allow_code_execution") and getattr(
agent, "allow_code_execution", False
):
tools = self._add_code_execution_tools(agent, tools)
if agent and agent.multimodal:
if hasattr(agent, "multimodal") and getattr(agent, "multimodal", False):
tools = self._add_multimodal_tools(agent, tools)
return tools
return list(tools)
def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]:
if self.process == Process.hierarchical:
@@ -918,8 +926,8 @@ class Crew(BaseModel):
return task.agent
def _merge_tools(
self, existing_tools: List[Tool], new_tools: List[Tool]
) -> List[Tool]:
self, existing_tools: Sequence[BaseTool], new_tools: Sequence[BaseTool]
) -> Sequence[BaseTool]:
"""Merge new tools into existing tools list, avoiding duplicates by tool name."""
if not new_tools:
return existing_tools
@@ -936,21 +944,42 @@ class Crew(BaseModel):
return tools
def _inject_delegation_tools(
self, tools: List[Tool], task_agent: BaseAgent, agents: List[BaseAgent]
self,
tools: Sequence[BaseTool],
task_agent: BaseAgent,
agents: Sequence[BaseAgent],
):
delegation_tools = task_agent.get_delegation_tools(agents)
return self._merge_tools(tools, delegation_tools)
def _add_multimodal_tools(self, agent: BaseAgent, tools: List[Tool]):
multimodal_tools = agent.get_multimodal_tools()
return self._merge_tools(tools, multimodal_tools)
def _add_multimodal_tools(
self, agent: BaseAgent, tools: Sequence[BaseTool]
) -> Sequence[BaseTool]:
if hasattr(agent, "get_multimodal_tools"):
multimodal_tools = getattr(agent, "get_multimodal_tools")()
return self._merge_tools(tools, multimodal_tools)
return tools
def _add_code_execution_tools(self, agent: BaseAgent, tools: List[Tool]):
code_tools = agent.get_code_execution_tools()
return self._merge_tools(tools, code_tools)
def _add_code_execution_tools(
self, agent: BaseAgent, tools: Sequence[BaseTool]
) -> Sequence[BaseTool]:
if hasattr(agent, "get_code_execution_tools"):
code_tools = getattr(agent, "get_code_execution_tools")()
return self._merge_tools(tools, code_tools)
return tools
def _add_delegation_tools(
self, task: Task, tools: Sequence[BaseTool]
) -> Sequence[BaseTool]:
# If the agent has specific agents to delegate to, use those
if task.agent and task.agent.delegate_to is not None:
agents_for_delegation = task.agent.delegate_to
else:
# Otherwise use all agents except the current one
agents_for_delegation = [
agent for agent in self.agents if agent != task.agent
]
def _add_delegation_tools(self, task: Task, tools: List[Tool]):
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
if not tools:
tools = []
@@ -965,7 +994,7 @@ class Crew(BaseModel):
task_name=task.name, task=task.description, agent=role, status="started"
)
def _update_manager_tools(self, task: Task, tools: List[Tool]):
def _update_manager_tools(self, task: Task, tools: Sequence[BaseTool]):
if self.manager_agent:
if task.agent:
tools = self._inject_delegation_tools(tools, task.agent, [task.agent])

View File

@@ -1,3 +1,5 @@
from typing import List
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.base_tool import BaseTool
from crewai.utilities import I18N
@@ -9,11 +11,11 @@ from .delegate_work_tool import DelegateWorkTool
class AgentTools:
"""Manager class for agent-related tools"""
def __init__(self, agents: list[BaseAgent], i18n: I18N = I18N()):
def __init__(self, agents: List[BaseAgent], i18n: I18N = I18N()):
self.agents = agents
self.i18n = i18n
def tools(self) -> list[BaseTool]:
def tools(self) -> List[BaseTool]:
"""Get all available agent tools"""
coworkers = ", ".join([f"{agent.role}" for agent in self.agents])

View File

@@ -1,5 +1,5 @@
import logging
from typing import Optional
from typing import Optional, Sequence
from pydantic import Field
@@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
class BaseAgentTool(BaseTool):
"""Base class for agent-related tools"""
agents: list[BaseAgent] = Field(description="List of available agents")
agents: Sequence[BaseAgent] = Field(description="List of available agents")
i18n: I18N = Field(
default_factory=I18N, description="Internationalization settings"
)
@@ -47,10 +47,7 @@ class BaseAgentTool(BaseTool):
return coworker
def _execute(
self,
agent_name: Optional[str],
task: str,
context: Optional[str] = None
self, agent_name: Optional[str], task: str, context: Optional[str] = None
) -> str:
"""
Execute delegation to an agent with case-insensitive and whitespace-tolerant matching.
@@ -77,33 +74,43 @@ class BaseAgentTool(BaseTool):
# when it should look like this:
# {"task": "....", "coworker": "...."}
sanitized_name = self.sanitize_agent_name(agent_name)
logger.debug(f"Sanitized agent name from '{agent_name}' to '{sanitized_name}'")
logger.debug(
f"Sanitized agent name from '{agent_name}' to '{sanitized_name}'"
)
available_agents = [agent.role for agent in self.agents]
logger.debug(f"Available agents: {available_agents}")
agent = [ # type: ignore # Incompatible types in assignment (expression has type "list[BaseAgent]", variable has type "str | None")
agent = [ # type: ignore # Incompatible types in assignment (expression has type "Sequence[BaseAgent]", variable has type "str | None")
available_agent
for available_agent in self.agents
if self.sanitize_agent_name(available_agent.role) == sanitized_name
]
logger.debug(f"Found {len(agent)} matching agents for role '{sanitized_name}'")
logger.debug(
f"Found {len(agent)} matching agents for role '{sanitized_name}'"
)
except (AttributeError, ValueError) as e:
# Handle specific exceptions that might occur during role name processing
return self.i18n.errors("agent_tool_unexisting_coworker").format(
coworkers="\n".join(
[f"- {self.sanitize_agent_name(agent.role)}" for agent in self.agents]
[
f"- {self.sanitize_agent_name(agent.role)}"
for agent in self.agents
]
),
error=str(e)
error=str(e),
)
if not agent:
# No matching agent found after sanitization
return self.i18n.errors("agent_tool_unexisting_coworker").format(
coworkers="\n".join(
[f"- {self.sanitize_agent_name(agent.role)}" for agent in self.agents]
[
f"- {self.sanitize_agent_name(agent.role)}"
for agent in self.agents
]
),
error=f"No agent found with role '{sanitized_name}'"
error=f"No agent found with role '{sanitized_name}'",
)
agent = agent[0]
@@ -114,11 +121,12 @@ class BaseAgentTool(BaseTool):
expected_output=agent.i18n.slice("manager_request"),
i18n=agent.i18n,
)
logger.debug(f"Created task for agent '{self.sanitize_agent_name(agent.role)}': {task}")
logger.debug(
f"Created task for agent '{self.sanitize_agent_name(agent.role)}': {task}"
)
return agent.execute_task(task_with_assigned_agent, context)
except Exception as e:
# Handle task creation or execution errors
return self.i18n.errors("agent_tool_execution_error").format(
agent_role=self.sanitize_agent_name(agent.role),
error=str(e)
agent_role=self.sanitize_agent_name(agent.role), error=str(e)
)

View File

@@ -248,13 +248,18 @@ def to_langchain(
def tool(*args):
"""
Decorator to create a tool from a function.
Ensures the decorated function is always wrapped as a BaseTool.
"""
def _make_with_name(tool_name: str) -> Callable:
def _make_with_name(tool_name: str) -> Callable[[Callable], BaseTool]:
def _make_tool(f: Callable) -> BaseTool:
# If f is already a BaseTool, return it
if isinstance(f, BaseTool):
return f
if f.__doc__ is None:
raise ValueError("Function must have a docstring")
if f.__annotations__ is None:
if not f.__annotations__:
raise ValueError("Function must have type annotations")
class_name = "".join(tool_name.split()).title()
@@ -278,7 +283,10 @@ def tool(*args):
return _make_tool
if len(args) == 1 and callable(args[0]):
if isinstance(args[0], BaseTool):
return args[0]
return _make_with_name(args[0].__name__)(args[0])
if len(args) == 1 and isinstance(args[0], str):
elif len(args) == 1 and isinstance(args[0], str):
return _make_with_name(args[0])
raise ValueError("Invalid arguments")
else:
raise ValueError("Invalid arguments")

View File

@@ -1797,3 +1797,136 @@ def test_litellm_anthropic_error_handling():
# Verify the LLM call was only made once (no retries)
mock_llm_call.assert_called_once()
@pytest.mark.vcr(filter_headers=["authorization"])
def test_agent_delegation_to_specific_agents():
"""Test that an agent can delegate to specific agents using the delegate_to property."""
# Create agents in order so we can reference them in delegate_to
agent2 = Agent(
role="Agent 2",
goal="Goal for Agent 2",
backstory="Backstory for Agent 2",
allow_delegation=True,
)
agent3 = Agent(
role="Agent 3",
goal="Goal for Agent 3",
backstory="Backstory for Agent 3",
allow_delegation=True,
)
# Create agent1 without specific delegation first to test default behavior
agent1 = Agent(
role="Agent 1",
goal="Goal for Agent 1",
backstory="Backstory for Agent 1",
allow_delegation=True,
)
# Test default behavior (delegate to all agents)
all_agents = [agent1, agent2, agent3]
delegation_tools = agent1.get_delegation_tools(all_agents)
# Verify that tools for all agents are returned
assert len(delegation_tools) == 2 # Delegate and Ask tools
# Check that the tools can delegate to all agents
delegate_tool = delegation_tools[0]
ask_tool = delegation_tools[1]
# Verify the tools description includes all agents
assert "Agent 1" in delegate_tool.description
assert "Agent 2" in delegate_tool.description
assert "Agent 3" in delegate_tool.description
assert "Agent 1" in ask_tool.description
assert "Agent 2" in ask_tool.description
assert "Agent 3" in ask_tool.description
# Test delegation to specific agents by creating a new agent with delegate_to
agent1_with_specific_delegation = Agent(
role="Agent 1",
goal="Goal for Agent 1",
backstory="Backstory for Agent 1",
allow_delegation=True,
delegate_to=[agent2], # Only delegate to agent2
)
specific_delegation_tools = agent1_with_specific_delegation.get_delegation_tools(
all_agents
)
# Verify that tools for only the specified agent are returned
assert len(specific_delegation_tools) == 2 # Delegate and Ask tools
# Check that the tools can only delegate to agent2
specific_delegate_tool = specific_delegation_tools[0]
specific_ask_tool = specific_delegation_tools[1]
# Verify the tools description includes only agent2
assert "Agent 2" in specific_delegate_tool.description
assert "Agent 1" not in specific_delegate_tool.description
assert "Agent 3" not in specific_delegate_tool.description
assert "Agent 2" in specific_ask_tool.description
assert "Agent 1" not in specific_ask_tool.description
assert "Agent 3" not in specific_ask_tool.description
def test_agent_copy_with_delegate_to():
"""Test that the delegate_to attribute is properly copied when copying an agent."""
# Create a few agents for delegation
agent1 = Agent(
role="Researcher",
goal="Research topics",
backstory="Experienced researcher",
)
agent2 = Agent(
role="Writer",
goal="Write content",
backstory="Professional writer",
)
agent3 = Agent(
role="Manager",
goal="Manage the team",
backstory="Expert manager",
allow_delegation=True,
delegate_to=[agent1, agent2], # This manager can delegate to agent1 and agent2
)
# Make a copy of the manager agent
copied_agent3 = agent3.copy()
# Verify the copied agent has the same delegation settings
assert copied_agent3.allow_delegation == agent3.allow_delegation
assert (
copied_agent3.delegate_to is not agent3.delegate_to
) # Should be different objects
assert copied_agent3.delegate_to is not None
assert agent3.delegate_to is not None
assert len(copied_agent3.delegate_to) == len(agent3.delegate_to)
assert all(a in copied_agent3.delegate_to for a in agent3.delegate_to)
# Modify the original agent's delegate_to list
assert agent3.delegate_to is not None
agent3.delegate_to.pop()
# Verify the copied agent's delegate_to list is not affected
assert copied_agent3.delegate_to is not None
assert agent3.delegate_to is not None
assert len(copied_agent3.delegate_to) == 2
assert len(agent3.delegate_to) == 1
# Test copying an agent with delegate_to=None
agent4 = Agent(
role="Solo Worker",
goal="Work independently",
backstory="Independent worker",
allow_delegation=False,
delegate_to=None,
)
copied_agent4 = agent4.copy()
assert copied_agent4.delegate_to == agent4.delegate_to

View File

@@ -724,13 +724,14 @@ def test_task_tools_override_agent_tools():
crew.kickoff()
# Verify task tools override agent tools
assert task.tools is not None
assert len(task.tools) == 1 # AnotherTestTool
assert any(isinstance(tool, AnotherTestTool) for tool in task.tools)
assert not any(isinstance(tool, TestTool) for tool in task.tools)
# Verify agent tools remain unchanged
assert new_researcher.tools is not None
assert len(new_researcher.tools) == 1
assert isinstance(new_researcher.tools[0], TestTool)
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -868,11 +869,17 @@ def test_crew_verbose_output(capsys):
event_listener.formatter.verbose = False
crew.kickoff()
captured = capsys.readouterr()
# Filter out event listener logs, escape codes, and now also 'tools:' lines
filtered_output = "\n".join(
line
for line in captured.out.split("\n")
if not line.startswith("[") and line.strip() and not line.startswith("\x1b")
if not line.startswith("[")
and line.strip()
and not line.startswith("\x1b")
and not "tools:" in line.lower() # Exclude 'tools:' lines
)
assert filtered_output == ""
@@ -1599,6 +1606,8 @@ def test_crew_function_calling_llm():
crew = Crew(agents=[agent1], tasks=[essay])
result = crew.kickoff()
assert result.raw == "Howdy!"
assert agent1.tools is not None
assert len(agent1.tools) == 1
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -4025,3 +4034,442 @@ def test_crew_with_knowledge_sources_works_with_copy():
assert len(crew_copy.tasks) == len(crew.tasks)
assert len(crew_copy.tasks) == len(crew.tasks)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_with_specific_delegation():
"""Test that agents in a crew can delegate to specific agents using the delegate_to property."""
# Create editor agent first since it will be referenced in writer's delegate_to
editor = Agent(
role="Editor",
goal="Edit content",
backstory="You're an expert editor",
allow_delegation=True,
)
# Create writer with delegate_to set during initialization
writer = Agent(
role="Writer",
goal="Write content",
backstory="You're an expert writer",
allow_delegation=True,
delegate_to=[editor], # Writer can only delegate to Editor
)
# Create researcher with delegate_to set during initialization
researcher = Agent(
role="Researcher",
goal="Research information",
backstory="You're an expert researcher",
allow_delegation=True,
delegate_to=[writer], # Researcher can only delegate to Writer
)
# Create tasks
task1 = Task(
description="Research a topic",
expected_output="Research results",
agent=researcher,
)
task2 = Task(
description="Write an article",
expected_output="Written article",
agent=writer,
)
# Create crew
crew = Crew(
agents=[researcher, writer, editor],
tasks=[task1, task2],
)
# Test that the _add_delegation_tools method respects the delegate_to property
tools = []
tools_with_delegation = crew._add_delegation_tools(task1, tools)
# Verify that delegation tools are added
assert len(tools_with_delegation) > 0
# Find the delegation tool
delegate_tool = None
for tool in tools_with_delegation:
if "Delegate" in tool.name:
delegate_tool = tool
break
assert delegate_tool is not None
# Verify that the delegation tool only includes the writer
assert "Writer" in delegate_tool.description
assert "Editor" not in delegate_tool.description
assert "Researcher" not in delegate_tool.description
# Test delegation for the writer
tools = []
tools_with_delegation = crew._add_delegation_tools(task2, tools)
# Find the delegation tool
delegate_tool = None
for tool in tools_with_delegation:
if "Delegate" in tool.name:
delegate_tool = tool
break
assert delegate_tool is not None
# Verify that the delegation tool only includes the editor
assert "Editor" in delegate_tool.description
assert "Writer" not in delegate_tool.description
assert "Researcher" not in delegate_tool.description
@pytest.mark.vcr(filter_headers=["authorization"])
def test_manager_agent_with_tools_and_delegation():
"""Test that a manager agent can have tools and still delegate to all agents."""
from crewai.tools.base_tool import BaseTool
# Create a simple tool for the manager
class SimpleTestTool(BaseTool):
name: str = "Simple Test Tool"
description: str = "A simple test tool"
def _run(self) -> str:
return "Tool executed"
# Create agents
researcher = Agent(
role="Researcher",
goal="Research information",
backstory="You're an expert researcher",
)
writer = Agent(
role="Writer",
goal="Write content",
backstory="You're an expert writer",
)
# Create a manager agent with tools
manager = Agent(
role="Manager",
goal="Manage the team",
backstory="You're an expert manager",
tools=[SimpleTestTool()],
allow_delegation=True,
)
# Create a crew with the manager agent
crew = Crew(
agents=[researcher, writer],
manager_agent=manager,
process=Process.hierarchical,
)
# Explicitly call _create_manager_agent to set up delegation
crew._create_manager_agent()
# Verify that the manager agent has tools
assert manager.tools is not None
assert len(manager.tools) == 1
assert manager.tools[0].name == "Simple Test Tool"
# Verify that the manager agent can delegate to all agents
assert manager.allow_delegation is True
assert manager.delegate_to == crew.agents
# Create a task
task = Task(
description="Complete a project",
expected_output="Project completed",
)
# Create a crew with the task
crew = Crew(
agents=[researcher, writer],
manager_agent=manager,
tasks=[task],
process=Process.hierarchical,
)
# Mock the execute_task method to avoid actual execution
with patch.object(Agent, "execute_task", return_value="Task executed"):
# Run the crew
result = crew.kickoff()
# Verify that the result is as expected
assert result.raw == "Task executed"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_with_default_delegation():
"""Test that an agent with allow_delegation=True but without delegate_to specified can delegate to all agents in the crew."""
# Create agents
researcher = Agent(
role="Researcher",
goal="Research information",
backstory="You're an expert researcher",
allow_delegation=True, # Allow delegation but don't specify delegate_to
)
writer = Agent(
role="Writer",
goal="Write content",
backstory="You're an expert writer",
allow_delegation=True, # Allow delegation but don't specify delegate_to
)
editor = Agent(
role="Editor",
goal="Edit content",
backstory="You're an expert editor",
allow_delegation=True, # Allow delegation but don't specify delegate_to
)
# Create tasks
task1 = Task(
description="Research a topic",
expected_output="Research results",
agent=researcher,
)
task2 = Task(
description="Write content based on research",
expected_output="Written content",
agent=writer,
)
task3 = Task(
description="Edit the content",
expected_output="Edited content",
agent=editor,
)
# Create crew
crew = Crew(
agents=[researcher, writer, editor],
tasks=[task1, task2, task3],
)
# Verify that all agents have allow_delegation=True
for agent in crew.agents:
assert agent.allow_delegation is True
# Verify that delegate_to is None (default delegation to all)
assert agent.delegate_to is None
# Get delegation tools for researcher
delegation_tools = researcher.get_delegation_tools(crew.agents)
# Verify that tools for all agents are returned
assert len(delegation_tools) == 2 # Delegate and Ask tools
# Check that the tools can delegate to all agents
delegate_tool = delegation_tools[0]
ask_tool = delegation_tools[1]
# Verify the tools description includes all agents
assert "Researcher" in delegate_tool.description
assert "Writer" in delegate_tool.description
assert "Editor" in delegate_tool.description
assert "Researcher" in ask_tool.description
assert "Writer" in ask_tool.description
assert "Editor" in ask_tool.description
@pytest.mark.vcr(filter_headers=["authorization"])
def test_update_manager_tools_functionality():
"""Test that _update_manager_tools correctly adds delegation tools to the manager agent."""
# Create agents
researcher = Agent(
role="Researcher",
goal="Research information",
backstory="You're an expert researcher",
)
writer = Agent(
role="Writer",
goal="Write content",
backstory="You're an expert writer",
)
# Create a manager agent
manager = Agent(
role="Manager",
goal="Manage the team",
backstory="You're an expert manager",
allow_delegation=True,
)
# Create a crew with the manager agent
crew = Crew(
agents=[researcher, writer],
manager_agent=manager,
process=Process.hierarchical,
)
# Ensure the manager agent is set up
crew._create_manager_agent()
# Case 1: Task with an assigned agent
task_with_agent = Task(
description="Research a topic",
expected_output="Research results",
agent=researcher,
)
# Create an initial set of tools
from crewai.tools.base_tool import BaseTool
class TestTool(BaseTool):
name: str = "Test Tool"
description: str = "A test tool"
def _run(self) -> str:
return "Tool executed"
initial_tools = [TestTool()]
# Test _update_manager_tools with a task that has an agent
updated_tools = crew._update_manager_tools(task_with_agent, initial_tools)
# Verify that delegation tools for the task's agent were added
assert len(updated_tools) > len(initial_tools)
assert any(
f"Delegate a specific task to one of the following coworkers: {researcher.role}"
in tool.description
for tool in updated_tools
)
assert any(
f"Ask a specific question to one of the following coworkers: {researcher.role}"
in tool.description
for tool in updated_tools
)
# Case 2: Task without an assigned agent
task_without_agent = Task(
description="General task",
expected_output="Task completed",
)
# Test _update_manager_tools with a task that doesn't have an agent
updated_tools = crew._update_manager_tools(task_without_agent, initial_tools)
# Verify that delegation tools for all agents were added
assert len(updated_tools) > len(initial_tools)
assert any(
f"Delegate a specific task to one of the following coworkers: {researcher.role}, {writer.role}"
in tool.description
for tool in updated_tools
)
assert any(
f"Ask a specific question to one of the following coworkers: {researcher.role}, {writer.role}"
in tool.description
for tool in updated_tools
)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_manager_tools_during_task_execution():
"""Test that manager tools are correctly added during task execution in a hierarchical process."""
# Create agents
researcher = Agent(
role="Researcher",
goal="Research information",
backstory="You're an expert researcher",
)
writer = Agent(
role="Writer",
goal="Write content",
backstory="You're an expert writer",
)
# Create tasks
task_with_agent = Task(
description="Research a topic",
expected_output="Research results",
agent=researcher,
)
task_without_agent = Task(
description="General task",
expected_output="Task completed",
)
# Create a crew with hierarchical process
crew_with_agent_task = Crew(
agents=[researcher, writer],
tasks=[task_with_agent],
process=Process.hierarchical,
manager_llm="gpt-4o",
)
crew_without_agent_task = Crew(
agents=[researcher, writer],
tasks=[task_without_agent],
process=Process.hierarchical,
manager_llm="gpt-4o",
)
# Mock task execution to capture the tools
mock_task_output = TaskOutput(
description="Mock description", raw="mocked output", agent="mocked agent"
)
# Test case 1: Task with an assigned agent
with patch.object(
Task, "execute_sync", return_value=mock_task_output
) as mock_execute_sync:
# Set the output attribute to avoid None errors
task_with_agent.output = mock_task_output
# Execute the crew
crew_with_agent_task.kickoff()
# Verify execute_sync was called
mock_execute_sync.assert_called_once()
# Get the tools argument from the call
_, kwargs = mock_execute_sync.call_args
tools = kwargs["tools"]
# Verify that delegation tools for the task's agent were added
assert any(
f"Delegate a specific task to one of the following coworkers: {researcher.role}"
in tool.description
for tool in tools
)
assert any(
f"Ask a specific question to one of the following coworkers: {researcher.role}"
in tool.description
for tool in tools
)
# Test case 2: Task without an assigned agent
with patch.object(
Task, "execute_sync", return_value=mock_task_output
) as mock_execute_sync:
# Set the output attribute to avoid None errors
task_without_agent.output = mock_task_output
# Execute the crew
crew_without_agent_task.kickoff()
# Verify execute_sync was called
mock_execute_sync.assert_called_once()
# Get the tools argument from the call
_, kwargs = mock_execute_sync.call_args
tools = kwargs["tools"]
# Verify that delegation tools for all agents were added
assert any(
f"Delegate a specific task to one of the following coworkers: {researcher.role}, {writer.role}"
in tool.description
for tool in tools
)
assert any(
f"Ask a specific question to one of the following coworkers: {researcher.role}, {writer.role}"
in tool.description
for tool in tools
)