refactor: enhance delegation handling with improved logging and validation (inspired by crewai review agent)

This commit is contained in:
Vardaan Grover
2025-02-09 21:19:25 +05:30
parent ea740fa4e1
commit b2c01c9ffc
3 changed files with 89 additions and 34 deletions

View File

@@ -258,10 +258,21 @@ class BaseAgent(ABC, BaseModel):
pass
@field_validator('allowed_agents')
def validate_allowed_agents(cls, v):
if v is None:
def validate_allowed_agents(cls, v: Optional[List[Union[str, 'BaseAgent']]]):
"""Validate and process the allowed agents provided.
This method ensures that each allowed agent is either a string or a BaseAgent instance.
If all agents meet this criteria, the list is returned as-is. Otherwise, a ValueError is raised.
Args:
v (Optional[List[Union[str, 'BaseAgent']]]): A list of strings or BaseAgent instances
representing the allowed agents, or None.
Returns:
Optional[List[Union[str, 'BaseAgent']]]: The validated list of allowed agents if valid.
Raises:
ValueError: If any element in the list is not a string or BaseAgent instance."""
if v is None or all(isinstance(agent, (str, BaseAgent)) for agent in v):
return v
return [agent.role if isinstance(agent, BaseAgent) else agent for agent in v]
raise ValueError("allowed_agents must be a list of strings or BaseAgent instances.")
@abstractmethod
def get_output_converter(

View File

@@ -2,6 +2,7 @@ import asyncio
import json
import re
import uuid
import logging
import warnings
from concurrent.futures import Future
from copy import copy as shallow_copy
@@ -57,6 +58,7 @@ try:
except ImportError:
agentops = None
logger = logging.getLogger(__name__)
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
@@ -860,37 +862,79 @@ class Crew(BaseModel):
code_tools = agent.get_code_execution_tools()
return self._merge_tools(tools, code_tools)
def _add_delegation_tools(self, task: Task, tools: List[Tool]):
# agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
# if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
# if not tools:
# tools = []
# tools = self._inject_delegation_tools(
# tools, task.agent, agents_for_delegation
# )
# return tools
print(f"Current agent: {task.agent.role}")
print(f"Allow delegation: {task.agent.allow_delegation}")
print(f"Allowed agents: {task.agent.allowed_agents}")
if not task.agent or not task.agent.allow_delegation:
print("Delegation not allowed for this agent")
return tools
def _add_delegation_tools(self, task: Task, tools: List[Tool]) -> List[Tool]:
"""Add delegation tools to the agent's toolkit.
agents_for_delegation = []
for agent in self.agents:
if agent == task.agent:
continue
if task.agent.allowed_agents is None or agent.role in task.agent.allowed_agents:
agents_for_delegation.append(agent)
print(f"Added {agent.role} to delegation list")
if agents_for_delegation:
if not tools:
tools = []
tools = self._inject_delegation_tools(
tools, task.agent, agents_for_delegation
)
Args:
task: The task requiring delegation tools
tools: Existing tools list
Returns:
Updated tools list with delegation capabilities
Raises:
TaskDelegationError: If delegation tool setup fails
"""
try:
return self._setup_agent_delegation(task, tools)
except Exception as e:
logger.error(f"Failed to add delegation tools: {str(e)}")
def _setup_agent_delegation(self, task: Task, tools: List[Tool]) -> List[Tool]:
"""Configure delegation tools for an agent.
Args:
task: The task requiring delegation
tools: Existing tools list
Returns:
Updated tools list
"""
if not task.agent or not task.agent.allow_delegation:
logger.debug(f"Delegation not allowed for agent: {task.agent.role if task.agent else 'None'}")
return tools
delegation_candidates = self._get_delegation_candidates(task.agent)
if delegation_candidates:
logger.debug(f"Setting up delegation tools for agent {task.agent.role}")
return self._inject_delegation_tools(tools, task.agent, delegation_candidates)
return tools
def _get_delegation_candidates(self, agent: BaseAgent) -> List[BaseAgent]:
"""Get list of agents that can be delegated to.
Args:
agent: The agent requesting delegation
Returns:
List of available agents for delegation
"""
candidates = []
for candidate in self.agents:
if candidate == agent:
continue
if self._can_delegate_to(agent, candidate):
logger.debug(f"Adding {candidate.role} as delegation candidate")
candidates.append(candidate)
return candidates
def _can_delegate_to(self, delegator: BaseAgent, target: BaseAgent) -> bool:
"""Check if an agent can delegate to another agent.
Args:
delegator: Agent attempting to delegate
target: Potential delegation target
Returns:
True if delegation is allowed, False otherwise
"""
if delegator.allowed_agents is None:
return True
return target.role in delegator.allowed_agents or target in delegator.allowed_agents
def _log_task_start(self, task: Task, role: str = "None"):
if self.output_log_file:
self._file_handler.log(

View File

@@ -53,7 +53,7 @@ class BaseAgentTool(BaseTool):
context: Optional[str] = None
) -> str:
try:
print("\n=== Delegating Work ===")
logger.debug("\n=== Delegating Work ===")
if agent_name is None:
agent_name = ""
@@ -72,7 +72,7 @@ class BaseAgentTool(BaseTool):
error=f"No agent found with role '{sanitized_name}'"
)
print(f"Delegating task to: {target_agent.role}")
logger.debug(f"Delegating task to: {target_agent.role}")
new_task = Task(
description=task,
@@ -88,7 +88,7 @@ class BaseAgentTool(BaseTool):
)
result = target_agent.execute_task(new_task, context, tools)
print("\n=== Delegation Complete ===")
logger.debug("\n=== Delegation Complete ===")
return result
except Exception as e: