feat: implement async delegation and spawning for improved agent task handling

- Added `_arun` methods to `DelegateToCoworkerTool`, `MultiDelegateTool`, and `SpawnSubtaskTool` classes to support asynchronous task delegation and spawning, enhancing non-blocking operations.
- Introduced event emissions for delegation and spawning processes, allowing for better tracking of task states and outcomes.
- Implemented error handling and logging for async operations, ensuring robust execution and feedback during agent interactions.

These enhancements aim to optimize the performance and responsiveness of agent task management within the CrewAI framework.
This commit is contained in:
Joao Moura
2026-05-13 04:30:43 -04:00
committed by alex-clawd
parent 22bcced6c0
commit b1c02428c6
3 changed files with 338 additions and 3 deletions

View File

@@ -244,6 +244,106 @@ class DelegateToCoworkerTool(BaseTool):
)
raise
async def _arun(self, message: str, fire_and_forget: bool = False, **kwargs: Any) -> str:
"""Async delegation — avoids blocking the event loop."""
from crewai.new_agent.events import (
NewAgentDelegationCompletedEvent,
NewAgentDelegationFailedEvent,
NewAgentDelegationStartedEvent,
NewAgentFireAndForgetCompletedEvent,
NewAgentFireAndForgetDispatchedEvent,
)
from crewai.new_agent.new_agent import NewAgent
cw_role = getattr(self.coworker, "role", "unknown")
parent_id = getattr(self.parent_agent, "id", "") if self.parent_agent else ""
if self.parent_agent and getattr(self.parent_agent, "on_delegate", None):
self.parent_agent.on_delegate(self.coworker, message)
if not isinstance(self.coworker, NewAgent):
return self._delegate_a2a(message)
if fire_and_forget:
_emit_delegation_event(
NewAgentFireAndForgetDispatchedEvent,
new_agent_id=parent_id,
coworker_role=cw_role,
)
async def _async_ff() -> None:
try:
await self.coworker.amessage(message)
finally:
_emit_delegation_event(
NewAgentFireAndForgetCompletedEvent,
new_agent_id=parent_id,
coworker_role=cw_role,
)
asyncio.get_running_loop().create_task(_async_ff())
return f"Work delegated to {cw_role}. They are working on it in the background."
_emit_delegation_event(
NewAgentDelegationStartedEvent,
new_agent_id=parent_id,
coworker_role=cw_role,
delegation_mode="sync",
coworker_source=self.coworker_source,
)
start = time.monotonic()
try:
response = await self.coworker.amessage(message)
elapsed_ms = int((time.monotonic() - start) * 1000)
in_tokens = getattr(response, "input_tokens", 0) or 0
out_tokens = getattr(response, "output_tokens", 0) or 0
tokens = in_tokens + out_tokens
_emit_delegation_event(
NewAgentDelegationCompletedEvent,
new_agent_id=parent_id,
coworker_role=cw_role,
tokens_consumed=tokens,
response_time_ms=elapsed_ms,
)
if self.parent_agent and tokens > 0:
try:
from crewai.new_agent.models import TokenUsage
executor = getattr(self.parent_agent, "_executor", None)
if executor is not None:
executor._sub_action_tokens.append(
TokenUsage(
action="delegation",
agent_id=str(parent_id),
input_tokens=in_tokens,
output_tokens=out_tokens,
model=getattr(response, "model", "") or "",
delegation_target=cw_role,
coworker_source=self.coworker_source,
)
)
except Exception:
pass
result_content = response.content
summary = _build_provenance_summary(
self.coworker, cw_role, elapsed_ms, in_tokens, out_tokens
)
if summary:
result_content += summary
return result_content
except Exception as e:
_emit_delegation_event(
NewAgentDelegationFailedEvent,
new_agent_id=parent_id,
coworker_role=cw_role,
error=str(e),
)
raise
def _delegate_a2a(self, message: str) -> str:
"""Delegate to an A2A remote coworker."""
try:
@@ -350,6 +450,58 @@ class MultiDelegateTool(BaseTool):
return "\n\n".join(results)
async def _arun(self, delegations: list[dict[str, str]], **kwargs: Any) -> str:
"""Async parallel delegation — avoids blocking the event loop."""
from crewai.new_agent.new_agent import NewAgent
tasks_to_run = []
for d in delegations:
cw_name = d.get("coworker", "")
message = d.get("message", "")
coworker = self.coworker_map.get(cw_name)
if coworker is None:
for role, cw in self.coworker_map.items():
if cw_name.lower() in role.lower():
coworker = cw
break
if coworker is None or not isinstance(coworker, NewAgent):
tasks_to_run.append((cw_name, message, None))
else:
tasks_to_run.append((cw_name, message, coworker))
async def _error_result(name: str) -> str:
return f"[Error] Coworker '{name}' not found."
coros = []
for cw_name, message, coworker in tasks_to_run:
if coworker is None:
coros.append(_error_result(cw_name))
else:
coros.append(coworker.amessage(message))
raw = await asyncio.gather(*coros, return_exceptions=True)
results: list[str] = []
for i, (cw_name, message, coworker) in enumerate(tasks_to_run):
r = raw[i]
if isinstance(r, Exception):
results.append(f"[{cw_name}] Error: {r}")
elif isinstance(r, str):
results.append(f"[{cw_name}] {r}")
else:
content = getattr(r, "content", str(r))
role = cw_name or f"Coworker {i + 1}"
in_tokens = getattr(r, "input_tokens", 0) or 0
out_tokens = getattr(r, "output_tokens", 0) or 0
if coworker is not None:
summary = _build_provenance_summary(
coworker, role, 0, in_tokens, out_tokens
)
if summary:
content += summary
results.append(f"[{role}] {content}")
return "\n\n".join(results)
def build_coworker_tools(
coworkers: list[Any],

View File

@@ -1926,6 +1926,11 @@ class ConversationalAgentExecutor(BaseModel):
result = parse_error.get(
"result", f"Error parsing args for {func_name}"
)
elif original_tool and self._tool_has_arun(original_tool):
if isinstance(parsed_args, dict):
result = await original_tool._arun(**parsed_args)
else:
result = await original_tool._arun(parsed_args)
elif isinstance(parsed_args, dict):
result = (
original_tool._run(**parsed_args)
@@ -2080,6 +2085,19 @@ class ConversationalAgentExecutor(BaseModel):
return None
@staticmethod
def _tool_has_arun(tool: Any) -> bool:
"""Check if a tool has a real async _arun (not the default NotImplementedError stub)."""
arun = getattr(tool, "_arun", None)
if arun is None:
return False
# BaseTool's default _arun raises NotImplementedError — skip it
for cls in type(tool).__mro__:
if "_arun" in cls.__dict__:
return cls.__name__ != "BaseTool" and cls.__name__ != "StructuredTool"
return False
def _parse_tool_call(self, tool_call: Any) -> tuple[str | None, Any, str | None]:
"""Parse a tool call into (func_name, args, call_id)."""
if hasattr(tool_call, "function"):

View File

@@ -277,7 +277,174 @@ class SpawnSubtaskTool(BaseTool):
else:
results = asyncio.run(_run_all())
# Log provenance for each spawn
self._log_spawn_provenance(subtasks, results, spawn_ids)
return "\n\n".join(results)
async def _arun(
self, subtasks: list[str], fire_and_forget: bool = False, **kwargs: Any
) -> str:
"""Async spawn — avoids blocking the event loop."""
from crewai.new_agent.new_agent import NewAgent
if not isinstance(self.agent, NewAgent):
return "Error: spawn tool requires a NewAgent instance."
if not self.agent.settings.can_spawn_copies:
return "Error: this agent is not allowed to spawn copies (can_spawn_copies=False)."
if self.agent.settings.max_spawn_depth < 1:
return "Error: spawn depth exceeded — copies cannot spawn further copies."
settings = self.agent.settings
max_spawns = settings.max_concurrent_spawns
timeout = settings.spawn_timeout
parent_id = str(self.agent.id)
if len(subtasks) > max_spawns:
subtasks = subtasks[:max_spawns]
spawn_ids: list[str] = []
for i, subtask in enumerate(subtasks):
spawn_id = f"spawn-{uuid4().hex[:8]}-{i + 1}"
spawn_ids.append(spawn_id)
try:
from crewai.new_agent.events import NewAgentSpawnStartedEvent
_emit_spawn_event(
NewAgentSpawnStartedEvent,
new_agent_id=parent_id,
spawn_id=spawn_id,
parent_id=parent_id,
spawn_depth=1,
)
except Exception:
pass
from crewai.new_agent.models import AgentSettings as SpawnSettings
spawn_settings = SpawnSettings(
can_spawn_copies=False,
max_spawn_depth=0,
memory_enabled=True,
provenance_enabled=settings.provenance_enabled,
respect_context_window=settings.respect_context_window,
cache_tool_results=settings.cache_tool_results,
narration_guard=settings.narration_guard,
narration_max_retries=settings.narration_max_retries,
)
enriched_messages: list[str] = []
for subtask in subtasks:
context = _query_parent_memory(self.agent, subtask)
if context:
enriched_messages.append(f"{context}\n\nTask: {subtask}")
else:
enriched_messages.append(subtask)
copies: list[NewAgent] = []
for subtask in subtasks:
copy = NewAgent(
role=self.agent.role,
goal=subtask,
backstory="",
llm=self.agent.llm,
tools=list(self.agent.tools),
memory=True,
memory_scope=f"spawn-{parent_id}",
settings=spawn_settings,
verbose=self.agent.verbose,
)
copies.append(copy)
if fire_and_forget:
for copy, msg, sid in zip(copies, enriched_messages, spawn_ids):
async def _bg(c: NewAgent = copy, m: str = msg, s: str = sid) -> None:
try:
await c.amessage(m)
try:
from crewai.new_agent.events import (
NewAgentSpawnCompletedEvent,
)
_emit_spawn_event(
NewAgentSpawnCompletedEvent,
new_agent_id=parent_id,
spawn_id=s,
)
except Exception:
pass
except Exception as e:
try:
from crewai.new_agent.events import (
NewAgentSpawnFailedEvent,
)
_emit_spawn_event(
NewAgentSpawnFailedEvent,
new_agent_id=parent_id,
spawn_id=s,
error=str(e),
)
except Exception:
pass
asyncio.get_running_loop().create_task(_bg())
return f"Dispatched {len(copies)} subtask(s) in the background (fire-and-forget)."
tasks = [
asyncio.wait_for(copy.amessage(msg), timeout=timeout)
for copy, msg in zip(copies, enriched_messages)
]
raw_results = await asyncio.gather(*tasks, return_exceptions=True)
results: list[str] = []
for i, r in enumerate(raw_results):
if isinstance(r, asyncio.TimeoutError):
results.append(f"[Subtask {i + 1}] Timed out after {timeout}s")
try:
from crewai.new_agent.events import NewAgentSpawnFailedEvent
_emit_spawn_event(
NewAgentSpawnFailedEvent,
new_agent_id=parent_id,
spawn_id=spawn_ids[i],
error=f"Timed out after {timeout}s",
)
except Exception:
pass
elif isinstance(r, Exception):
results.append(f"[Subtask {i + 1}] Error: {r}")
try:
from crewai.new_agent.events import NewAgentSpawnFailedEvent
_emit_spawn_event(
NewAgentSpawnFailedEvent,
new_agent_id=parent_id,
spawn_id=spawn_ids[i],
error=str(r),
)
except Exception:
pass
else:
results.append(f"[Subtask {i + 1}] {r.content}")
try:
from crewai.new_agent.events import NewAgentSpawnCompletedEvent
_emit_spawn_event(
NewAgentSpawnCompletedEvent,
new_agent_id=parent_id,
spawn_id=spawn_ids[i],
)
except Exception:
pass
self._log_spawn_provenance(subtasks, results, spawn_ids)
return "\n\n".join(results)
def _log_spawn_provenance(
self, subtasks: list[str], results: list[str], spawn_ids: list[str]
) -> None:
if self.agent.settings.provenance_enabled and hasattr(self.agent, "_executor"):
from crewai.new_agent.models import ProvenanceEntry
@@ -297,5 +464,3 @@ class SpawnSubtaskTool(BaseTool):
outcome=result[:500],
)
)
return "\n\n".join(results)