mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-01 07:13:00 +00:00
Compare commits
1 Commits
devin/1739
...
devin/1757
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3bfa1c6559 |
@@ -107,17 +107,6 @@ class BaseAgent(ABC, BaseModel):
|
||||
default=False,
|
||||
description="Enable agent to delegate and ask questions among each other.",
|
||||
)
|
||||
allowed_agents: Optional[List[str]] = Field(
|
||||
default=None,
|
||||
description="List of agent roles that this agent is allowed to delegate tasks to.",
|
||||
docstring="""
|
||||
Specifies which agent roles this agent can delegate tasks to. When set:
|
||||
- Must be a list of role names as strings
|
||||
- Cannot be empty if delegation is enabled
|
||||
- Case-insensitive matching is used for role names
|
||||
- If None, agent can delegate to any other agent (when allow_delegation is True)
|
||||
""",
|
||||
)
|
||||
tools: Optional[List[Any]] = Field(
|
||||
default_factory=list, description="Tools at agents' disposal"
|
||||
)
|
||||
@@ -185,9 +174,6 @@ class BaseAgent(ABC, BaseModel):
|
||||
f"{field} must be provided either directly or through config"
|
||||
)
|
||||
|
||||
# Validate allowed_agents configuration
|
||||
self._validate_allowed_agents()
|
||||
|
||||
# Set private attributes
|
||||
self._logger = Logger(verbose=self.verbose)
|
||||
if self.max_rpm and not self._rpm_controller:
|
||||
@@ -228,24 +214,6 @@ class BaseAgent(ABC, BaseModel):
|
||||
]
|
||||
return md5("|".join(source).encode(), usedforsecurity=False).hexdigest()
|
||||
|
||||
def _validate_allowed_agents(self) -> None:
|
||||
"""Validate allowed_agents configuration.
|
||||
|
||||
Raises:
|
||||
ValueError: If allowed_agents is not properly configured:
|
||||
- Not a list of strings when specified
|
||||
- Empty list when delegation is enabled
|
||||
- Contains non-string entries
|
||||
"""
|
||||
if self.allow_delegation and self.allowed_agents is not None:
|
||||
if not isinstance(self.allowed_agents, list):
|
||||
raise ValueError("allowed_agents must be a list of strings")
|
||||
if not all(isinstance(agent, str) for agent in self.allowed_agents):
|
||||
raise ValueError("all entries in allowed_agents must be strings")
|
||||
if len(self.allowed_agents) == 0:
|
||||
raise ValueError("allowed_agents cannot be empty when delegation is enabled")
|
||||
|
||||
|
||||
@abstractmethod
|
||||
def execute_task(
|
||||
self,
|
||||
|
||||
@@ -112,6 +112,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
try:
|
||||
while not isinstance(formatted_answer, AgentFinish):
|
||||
if not self.request_within_rpm_limit or self.request_within_rpm_limit():
|
||||
self._check_context_length_before_call()
|
||||
|
||||
answer = self.llm.call(
|
||||
self.messages,
|
||||
callbacks=self.callbacks,
|
||||
@@ -327,6 +329,19 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
)
|
||||
]
|
||||
|
||||
def _check_context_length_before_call(self) -> None:
|
||||
total_chars = sum(len(msg.get("content", "")) for msg in self.messages)
|
||||
estimated_tokens = total_chars // 4
|
||||
|
||||
context_window_size = self.llm.get_context_window_size()
|
||||
|
||||
if estimated_tokens > context_window_size:
|
||||
self._printer.print(
|
||||
content=f"Estimated token count ({estimated_tokens}) exceeds context window ({context_window_size}). Handling proactively.",
|
||||
color="yellow",
|
||||
)
|
||||
self._handle_context_length()
|
||||
|
||||
def _handle_context_length(self) -> None:
|
||||
if self.respect_context_window:
|
||||
self._printer.print(
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from typing import Optional, Union
|
||||
from pydantic import UUID4, Field
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.task import Task
|
||||
@@ -11,44 +12,10 @@ class BaseAgentTool(BaseTool):
|
||||
"""Base class for agent-related tools"""
|
||||
|
||||
agents: list[BaseAgent] = Field(description="List of available agents")
|
||||
agent_id: UUID4 = Field(description="ID of the agent using this tool")
|
||||
i18n: I18N = Field(
|
||||
default_factory=I18N, description="Internationalization settings"
|
||||
)
|
||||
|
||||
def _get_agent_by_id(self, agent_id: UUID4) -> Optional[BaseAgent]:
|
||||
"""Helper method to find agent by ID."""
|
||||
return next((a for a in self.agents if a.id == agent_id), None)
|
||||
|
||||
def _get_agent_by_role(self, role: str) -> Optional[BaseAgent]:
|
||||
"""Helper method to find agent by role (case-insensitive)."""
|
||||
return next(
|
||||
(a for a in self.agents if a.role.casefold() == role.casefold()),
|
||||
None
|
||||
)
|
||||
|
||||
def _check_delegation_authorization(
|
||||
self, delegating_agent: BaseAgent, target_role: str
|
||||
) -> Optional[str]:
|
||||
"""Verify if delegation is authorized.
|
||||
|
||||
Args:
|
||||
delegating_agent: The agent attempting to delegate
|
||||
target_role: The role of the agent being delegated to
|
||||
|
||||
Returns:
|
||||
Optional[str]: Error message if delegation is not authorized, None otherwise
|
||||
"""
|
||||
if (delegating_agent.allowed_agents is not None and
|
||||
not any(allowed.casefold() == target_role.casefold()
|
||||
for allowed in delegating_agent.allowed_agents)):
|
||||
return self.i18n.errors("agent_tool_unauthorized_delegation").format(
|
||||
coworker=target_role,
|
||||
allowed_agents="\n".join([f"- {role}" for role in delegating_agent.allowed_agents])
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def _get_coworker(self, coworker: Optional[str], **kwargs) -> Optional[str]:
|
||||
coworker = coworker or kwargs.get("co_worker") or kwargs.get("coworker")
|
||||
if coworker:
|
||||
@@ -91,17 +58,6 @@ class BaseAgentTool(BaseTool):
|
||||
)
|
||||
)
|
||||
|
||||
# Get delegating agent and check authorization
|
||||
delegating_agent = self._get_agent_by_id(self.agent_id)
|
||||
if not delegating_agent:
|
||||
return self.i18n.errors("agent_tool_unexisting_coworker").format(
|
||||
coworkers="\n".join([f"- {agent.role}" for agent in self.agents])
|
||||
)
|
||||
|
||||
auth_error = self._check_delegation_authorization(delegating_agent, agent[0].role)
|
||||
if auth_error:
|
||||
return auth_error
|
||||
|
||||
agent = agent[0]
|
||||
task_with_assigned_agent = Task( # type: ignore # Incompatible types in assignment (expression has type "Task", variable has type "str")
|
||||
description=task,
|
||||
|
||||
@@ -33,8 +33,7 @@
|
||||
"tool_usage_error": "I encountered an error: {error}",
|
||||
"tool_arguments_error": "Error: the Action Input is not a valid key, value dictionary.",
|
||||
"wrong_tool_name": "You tried to use the tool {tool}, but it doesn't exist. You must use one of the following tools, use one at time: {tools}.",
|
||||
"tool_usage_exception": "I encountered an error while trying to use the tool. This was the error: {error}.\n Tool {tool} accepts these inputs: {tool_inputs}",
|
||||
"agent_tool_unauthorized_delegation": "Authorization Error: Cannot delegate task to {coworker}.\nThis agent is only authorized to delegate to:\n{allowed_agents}\nPlease select an authorized agent for delegation."
|
||||
"tool_usage_exception": "I encountered an error while trying to use the tool. This was the error: {error}.\n Tool {tool} accepts these inputs: {tool_inputs}"
|
||||
},
|
||||
"tools": {
|
||||
"delegate_work": "Delegate a specific task to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the task you want them to do, and ALL necessary context to execute the task, they know nothing about the task, so share absolute everything you know, don't reference things but instead explain them.",
|
||||
|
||||
@@ -1625,3 +1625,78 @@ def test_agent_with_knowledge_sources():
|
||||
|
||||
# Assert that the agent provides the correct information
|
||||
assert "red" in result.raw.lower()
|
||||
|
||||
|
||||
def test_proactive_context_length_handling_prevents_empty_response():
|
||||
"""Test that proactive context length checking prevents empty LLM responses."""
|
||||
agent = Agent(
|
||||
role="test role",
|
||||
goal="test goal",
|
||||
backstory="test backstory",
|
||||
sliding_context_window=True,
|
||||
)
|
||||
|
||||
long_input = "This is a very long input that should exceed the context window. " * 1000
|
||||
|
||||
with patch.object(agent.llm, 'get_context_window_size', return_value=100):
|
||||
with patch.object(agent.agent_executor, '_handle_context_length') as mock_handle:
|
||||
with patch.object(agent.llm, 'call', return_value="Proper response after summarization"):
|
||||
|
||||
agent.agent_executor.messages = [
|
||||
{"role": "user", "content": long_input}
|
||||
]
|
||||
|
||||
task = Task(
|
||||
description="Process this long input",
|
||||
expected_output="A response",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
result = agent.execute_task(task)
|
||||
|
||||
mock_handle.assert_called()
|
||||
assert result and result.strip() != ""
|
||||
|
||||
|
||||
def test_proactive_context_length_handling_with_no_summarization():
|
||||
"""Test proactive context length checking when summarization is disabled."""
|
||||
agent = Agent(
|
||||
role="test role",
|
||||
goal="test goal",
|
||||
backstory="test backstory",
|
||||
sliding_context_window=False,
|
||||
)
|
||||
|
||||
long_input = "This is a very long input. " * 1000
|
||||
|
||||
with patch.object(agent.llm, 'get_context_window_size', return_value=100):
|
||||
agent.agent_executor.messages = [
|
||||
{"role": "user", "content": long_input}
|
||||
]
|
||||
|
||||
with pytest.raises(SystemExit):
|
||||
agent.agent_executor._check_context_length_before_call()
|
||||
|
||||
|
||||
def test_context_length_estimation():
|
||||
"""Test the token estimation logic."""
|
||||
agent = Agent(
|
||||
role="test role",
|
||||
goal="test goal",
|
||||
backstory="test backstory",
|
||||
)
|
||||
|
||||
agent.agent_executor.messages = [
|
||||
{"role": "user", "content": "Short message"},
|
||||
{"role": "assistant", "content": "Another short message"},
|
||||
]
|
||||
|
||||
with patch.object(agent.llm, 'get_context_window_size', return_value=10):
|
||||
with patch.object(agent.agent_executor, '_handle_context_length') as mock_handle:
|
||||
agent.agent_executor._check_context_length_before_call()
|
||||
mock_handle.assert_not_called()
|
||||
|
||||
with patch.object(agent.llm, 'get_context_window_size', return_value=5):
|
||||
with patch.object(agent.agent_executor, '_handle_context_length') as mock_handle:
|
||||
agent.agent_executor._check_context_length_before_call()
|
||||
mock_handle.assert_called()
|
||||
|
||||
@@ -1,212 +0,0 @@
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.crew import Crew
|
||||
from crewai.task import Task
|
||||
from crewai.tools.agent_tools.delegate_work_tool import DelegateWorkTool
|
||||
|
||||
def test_delegate_work_with_allowed_agents():
|
||||
"""Test successful delegation to allowed agent."""
|
||||
# Create agents
|
||||
executive = Agent(
|
||||
role="Executive Director",
|
||||
goal="Manage the team",
|
||||
backstory="An experienced manager",
|
||||
allow_delegation=True,
|
||||
allowed_agents=["Communications Manager"]
|
||||
)
|
||||
comms_manager = Agent(
|
||||
role="Communications Manager",
|
||||
goal="Handle communications",
|
||||
backstory="A skilled communicator",
|
||||
allow_delegation=False
|
||||
)
|
||||
|
||||
# Mock LLM to avoid actual API calls
|
||||
mock_content = "Thought: I will handle this task\nFinal Answer: Task completed successfully"
|
||||
mock_response = {
|
||||
"choices": [{
|
||||
"message": {
|
||||
"content": mock_content
|
||||
}
|
||||
}]
|
||||
}
|
||||
executive.llm = MagicMock()
|
||||
executive.llm.invoke = MagicMock(return_value=mock_response)
|
||||
executive.llm.call = MagicMock(return_value=mock_content)
|
||||
comms_manager.llm = MagicMock()
|
||||
comms_manager.llm.invoke = MagicMock(return_value=mock_response)
|
||||
comms_manager.llm.call = MagicMock(return_value=mock_content)
|
||||
|
||||
# Create crew and tool
|
||||
crew = Crew(agents=[executive, comms_manager])
|
||||
tool = DelegateWorkTool(
|
||||
name="Delegate work to coworker",
|
||||
description="Tool for delegating work to coworkers",
|
||||
agents=[executive, comms_manager],
|
||||
agent_id=executive.id
|
||||
)
|
||||
|
||||
# Test delegation
|
||||
result = tool._execute(
|
||||
agent_name="Communications Manager",
|
||||
task="Write a press release",
|
||||
context="Important company announcement"
|
||||
)
|
||||
|
||||
# Verify delegation was allowed
|
||||
assert "authorization error" not in result.lower()
|
||||
assert "cannot delegate" not in result.lower()
|
||||
|
||||
def test_delegate_work_with_unauthorized_agent():
|
||||
"""Test failed delegation to unauthorized agent."""
|
||||
# Create agents
|
||||
executive = Agent(
|
||||
role="Executive Director",
|
||||
goal="Manage the team",
|
||||
backstory="An experienced manager",
|
||||
allow_delegation=True,
|
||||
allowed_agents=["Communications Manager"]
|
||||
)
|
||||
tech_manager = Agent(
|
||||
role="Tech Manager",
|
||||
goal="Manage technology",
|
||||
backstory="A tech expert",
|
||||
allow_delegation=False
|
||||
)
|
||||
|
||||
# Mock LLM to avoid actual API calls
|
||||
mock_content = "Thought: I will handle this task\nFinal Answer: Task completed successfully"
|
||||
mock_response = {
|
||||
"choices": [{
|
||||
"message": {
|
||||
"content": mock_content
|
||||
}
|
||||
}]
|
||||
}
|
||||
executive.llm = MagicMock()
|
||||
executive.llm.invoke = MagicMock(return_value=mock_response)
|
||||
executive.llm.call = MagicMock(return_value=mock_content)
|
||||
tech_manager.llm = MagicMock()
|
||||
tech_manager.llm.invoke = MagicMock(return_value=mock_response)
|
||||
tech_manager.llm.call = MagicMock(return_value=mock_content)
|
||||
|
||||
# Create crew and tool
|
||||
crew = Crew(agents=[executive, tech_manager])
|
||||
tool = DelegateWorkTool(
|
||||
name="Delegate work to coworker",
|
||||
description="Tool for delegating work to coworkers",
|
||||
agents=[executive, tech_manager],
|
||||
agent_id=executive.id
|
||||
)
|
||||
|
||||
# Test delegation
|
||||
result = tool._execute(
|
||||
agent_name="Tech Manager",
|
||||
task="Update servers",
|
||||
context="Server maintenance needed"
|
||||
)
|
||||
|
||||
# Verify delegation was blocked with proper error message
|
||||
assert "authorization error" in result.lower()
|
||||
assert "tech manager" in result.lower()
|
||||
assert "communications manager" in result.lower()
|
||||
|
||||
@pytest.mark.parametrize("scenario", [
|
||||
{
|
||||
"name": "empty_allowed_agents",
|
||||
"delegating_agent": {
|
||||
"role": "Manager",
|
||||
"allow_delegation": True,
|
||||
"allowed_agents": []
|
||||
},
|
||||
"target_agent": "Worker",
|
||||
"should_succeed": False,
|
||||
"error_contains": "cannot be empty"
|
||||
},
|
||||
{
|
||||
"name": "case_insensitive_match",
|
||||
"delegating_agent": {
|
||||
"role": "Manager",
|
||||
"allow_delegation": True,
|
||||
"allowed_agents": ["Worker"]
|
||||
},
|
||||
"target_agent": "WORKER",
|
||||
"should_succeed": True
|
||||
},
|
||||
{
|
||||
"name": "unauthorized_delegation",
|
||||
"delegating_agent": {
|
||||
"role": "Manager",
|
||||
"allow_delegation": True,
|
||||
"allowed_agents": ["Worker A"]
|
||||
},
|
||||
"target_agent": "Worker B",
|
||||
"should_succeed": False,
|
||||
"error_contains": "Authorization Error"
|
||||
},
|
||||
{
|
||||
"name": "no_allowed_agents_specified",
|
||||
"delegating_agent": {
|
||||
"role": "Manager",
|
||||
"allow_delegation": True,
|
||||
"allowed_agents": None
|
||||
},
|
||||
"target_agent": "Worker",
|
||||
"should_succeed": True
|
||||
}
|
||||
])
|
||||
def test_delegation_scenarios(scenario):
|
||||
"""Test various delegation scenarios."""
|
||||
# Create agents
|
||||
delegating_agent = Agent(
|
||||
role=scenario["delegating_agent"]["role"],
|
||||
goal="Manage the team",
|
||||
backstory="An experienced manager",
|
||||
allow_delegation=scenario["delegating_agent"]["allow_delegation"],
|
||||
allowed_agents=scenario["delegating_agent"]["allowed_agents"]
|
||||
)
|
||||
target_agent = Agent(
|
||||
role=scenario["target_agent"],
|
||||
goal="Do the work",
|
||||
backstory="A skilled worker",
|
||||
allow_delegation=False
|
||||
)
|
||||
|
||||
# Mock LLM to avoid actual API calls
|
||||
mock_content = "Thought: I will handle this task\nFinal Answer: Task completed successfully"
|
||||
mock_response = {
|
||||
"choices": [{
|
||||
"message": {
|
||||
"content": mock_content
|
||||
}
|
||||
}]
|
||||
}
|
||||
for agent in [delegating_agent, target_agent]:
|
||||
agent.llm = MagicMock()
|
||||
agent.llm.invoke = MagicMock(return_value=mock_response)
|
||||
agent.llm.call = MagicMock(return_value=mock_content)
|
||||
|
||||
# Create crew and tool
|
||||
crew = Crew(agents=[delegating_agent, target_agent])
|
||||
tool = DelegateWorkTool(
|
||||
name="Delegate work to coworker",
|
||||
description="Tool for delegating work to coworkers",
|
||||
agents=[delegating_agent, target_agent],
|
||||
agent_id=delegating_agent.id
|
||||
)
|
||||
|
||||
# Test delegation
|
||||
result = tool._execute(
|
||||
agent_name=scenario["target_agent"],
|
||||
task="Complete task",
|
||||
context="Important task"
|
||||
)
|
||||
|
||||
# Verify results
|
||||
if scenario["should_succeed"]:
|
||||
assert "authorization error" not in result.lower()
|
||||
assert "cannot delegate" not in result.lower()
|
||||
else:
|
||||
assert scenario["error_contains"].lower() in result.lower()
|
||||
Reference in New Issue
Block a user