From b1c02428c685d668cfeee32242a15152ff144a3b Mon Sep 17 00:00:00 2001 From: Joao Moura Date: Wed, 13 May 2026 04:30:43 -0400 Subject: [PATCH] feat: implement async delegation and spawning for improved agent task handling - Added `_arun` methods to `DelegateToCoworkerTool`, `MultiDelegateTool`, and `SpawnSubtaskTool` classes to support asynchronous task delegation and spawning, enhancing non-blocking operations. - Introduced event emissions for delegation and spawning processes, allowing for better tracking of task states and outcomes. - Implemented error handling and logging for async operations, ensuring robust execution and feedback during agent interactions. These enhancements aim to optimize the performance and responsiveness of agent task management within the CrewAI framework. --- .../src/crewai/new_agent/coworker_tools.py | 152 ++++++++++++++++ lib/crewai/src/crewai/new_agent/executor.py | 18 ++ .../src/crewai/new_agent/spawn_tools.py | 171 +++++++++++++++++- 3 files changed, 338 insertions(+), 3 deletions(-) diff --git a/lib/crewai/src/crewai/new_agent/coworker_tools.py b/lib/crewai/src/crewai/new_agent/coworker_tools.py index 641e69963..c9dcbde8a 100644 --- a/lib/crewai/src/crewai/new_agent/coworker_tools.py +++ b/lib/crewai/src/crewai/new_agent/coworker_tools.py @@ -244,6 +244,106 @@ class DelegateToCoworkerTool(BaseTool): ) raise + async def _arun(self, message: str, fire_and_forget: bool = False, **kwargs: Any) -> str: + """Async delegation — avoids blocking the event loop.""" + from crewai.new_agent.events import ( + NewAgentDelegationCompletedEvent, + NewAgentDelegationFailedEvent, + NewAgentDelegationStartedEvent, + NewAgentFireAndForgetCompletedEvent, + NewAgentFireAndForgetDispatchedEvent, + ) + from crewai.new_agent.new_agent import NewAgent + + cw_role = getattr(self.coworker, "role", "unknown") + parent_id = getattr(self.parent_agent, "id", "") if self.parent_agent else "" + + if self.parent_agent and getattr(self.parent_agent, "on_delegate", None): + self.parent_agent.on_delegate(self.coworker, message) + + if not isinstance(self.coworker, NewAgent): + return self._delegate_a2a(message) + + if fire_and_forget: + _emit_delegation_event( + NewAgentFireAndForgetDispatchedEvent, + new_agent_id=parent_id, + coworker_role=cw_role, + ) + + async def _async_ff() -> None: + try: + await self.coworker.amessage(message) + finally: + _emit_delegation_event( + NewAgentFireAndForgetCompletedEvent, + new_agent_id=parent_id, + coworker_role=cw_role, + ) + + asyncio.get_running_loop().create_task(_async_ff()) + return f"Work delegated to {cw_role}. They are working on it in the background." + + _emit_delegation_event( + NewAgentDelegationStartedEvent, + new_agent_id=parent_id, + coworker_role=cw_role, + delegation_mode="sync", + coworker_source=self.coworker_source, + ) + + start = time.monotonic() + try: + response = await self.coworker.amessage(message) + elapsed_ms = int((time.monotonic() - start) * 1000) + in_tokens = getattr(response, "input_tokens", 0) or 0 + out_tokens = getattr(response, "output_tokens", 0) or 0 + tokens = in_tokens + out_tokens + _emit_delegation_event( + NewAgentDelegationCompletedEvent, + new_agent_id=parent_id, + coworker_role=cw_role, + tokens_consumed=tokens, + response_time_ms=elapsed_ms, + ) + + if self.parent_agent and tokens > 0: + try: + from crewai.new_agent.models import TokenUsage + + executor = getattr(self.parent_agent, "_executor", None) + if executor is not None: + executor._sub_action_tokens.append( + TokenUsage( + action="delegation", + agent_id=str(parent_id), + input_tokens=in_tokens, + output_tokens=out_tokens, + model=getattr(response, "model", "") or "", + delegation_target=cw_role, + coworker_source=self.coworker_source, + ) + ) + except Exception: + pass + + result_content = response.content + summary = _build_provenance_summary( + self.coworker, cw_role, elapsed_ms, in_tokens, out_tokens + ) + if summary: + result_content += summary + + return result_content + except Exception as e: + _emit_delegation_event( + NewAgentDelegationFailedEvent, + new_agent_id=parent_id, + coworker_role=cw_role, + error=str(e), + ) + raise + def _delegate_a2a(self, message: str) -> str: """Delegate to an A2A remote coworker.""" try: @@ -350,6 +450,58 @@ class MultiDelegateTool(BaseTool): return "\n\n".join(results) + async def _arun(self, delegations: list[dict[str, str]], **kwargs: Any) -> str: + """Async parallel delegation — avoids blocking the event loop.""" + from crewai.new_agent.new_agent import NewAgent + + tasks_to_run = [] + for d in delegations: + cw_name = d.get("coworker", "") + message = d.get("message", "") + coworker = self.coworker_map.get(cw_name) + if coworker is None: + for role, cw in self.coworker_map.items(): + if cw_name.lower() in role.lower(): + coworker = cw + break + if coworker is None or not isinstance(coworker, NewAgent): + tasks_to_run.append((cw_name, message, None)) + else: + tasks_to_run.append((cw_name, message, coworker)) + + async def _error_result(name: str) -> str: + return f"[Error] Coworker '{name}' not found." + + coros = [] + for cw_name, message, coworker in tasks_to_run: + if coworker is None: + coros.append(_error_result(cw_name)) + else: + coros.append(coworker.amessage(message)) + raw = await asyncio.gather(*coros, return_exceptions=True) + + results: list[str] = [] + for i, (cw_name, message, coworker) in enumerate(tasks_to_run): + r = raw[i] + if isinstance(r, Exception): + results.append(f"[{cw_name}] Error: {r}") + elif isinstance(r, str): + results.append(f"[{cw_name}] {r}") + else: + content = getattr(r, "content", str(r)) + role = cw_name or f"Coworker {i + 1}" + in_tokens = getattr(r, "input_tokens", 0) or 0 + out_tokens = getattr(r, "output_tokens", 0) or 0 + if coworker is not None: + summary = _build_provenance_summary( + coworker, role, 0, in_tokens, out_tokens + ) + if summary: + content += summary + results.append(f"[{role}] {content}") + + return "\n\n".join(results) + def build_coworker_tools( coworkers: list[Any], diff --git a/lib/crewai/src/crewai/new_agent/executor.py b/lib/crewai/src/crewai/new_agent/executor.py index 0809c2dab..d9aaa6565 100644 --- a/lib/crewai/src/crewai/new_agent/executor.py +++ b/lib/crewai/src/crewai/new_agent/executor.py @@ -1926,6 +1926,11 @@ class ConversationalAgentExecutor(BaseModel): result = parse_error.get( "result", f"Error parsing args for {func_name}" ) + elif original_tool and self._tool_has_arun(original_tool): + if isinstance(parsed_args, dict): + result = await original_tool._arun(**parsed_args) + else: + result = await original_tool._arun(parsed_args) elif isinstance(parsed_args, dict): result = ( original_tool._run(**parsed_args) @@ -2080,6 +2085,19 @@ class ConversationalAgentExecutor(BaseModel): return None + @staticmethod + def _tool_has_arun(tool: Any) -> bool: + """Check if a tool has a real async _arun (not the default NotImplementedError stub).""" + arun = getattr(tool, "_arun", None) + if arun is None: + return False + # BaseTool's default _arun raises NotImplementedError — skip it + for cls in type(tool).__mro__: + if "_arun" in cls.__dict__: + return cls.__name__ != "BaseTool" and cls.__name__ != "StructuredTool" + + return False + def _parse_tool_call(self, tool_call: Any) -> tuple[str | None, Any, str | None]: """Parse a tool call into (func_name, args, call_id).""" if hasattr(tool_call, "function"): diff --git a/lib/crewai/src/crewai/new_agent/spawn_tools.py b/lib/crewai/src/crewai/new_agent/spawn_tools.py index 360ebfc13..3eac5a996 100644 --- a/lib/crewai/src/crewai/new_agent/spawn_tools.py +++ b/lib/crewai/src/crewai/new_agent/spawn_tools.py @@ -277,7 +277,174 @@ class SpawnSubtaskTool(BaseTool): else: results = asyncio.run(_run_all()) - # Log provenance for each spawn + self._log_spawn_provenance(subtasks, results, spawn_ids) + return "\n\n".join(results) + + async def _arun( + self, subtasks: list[str], fire_and_forget: bool = False, **kwargs: Any + ) -> str: + """Async spawn — avoids blocking the event loop.""" + from crewai.new_agent.new_agent import NewAgent + + if not isinstance(self.agent, NewAgent): + return "Error: spawn tool requires a NewAgent instance." + + if not self.agent.settings.can_spawn_copies: + return "Error: this agent is not allowed to spawn copies (can_spawn_copies=False)." + + if self.agent.settings.max_spawn_depth < 1: + return "Error: spawn depth exceeded — copies cannot spawn further copies." + + settings = self.agent.settings + max_spawns = settings.max_concurrent_spawns + timeout = settings.spawn_timeout + parent_id = str(self.agent.id) + + if len(subtasks) > max_spawns: + subtasks = subtasks[:max_spawns] + + spawn_ids: list[str] = [] + for i, subtask in enumerate(subtasks): + spawn_id = f"spawn-{uuid4().hex[:8]}-{i + 1}" + spawn_ids.append(spawn_id) + try: + from crewai.new_agent.events import NewAgentSpawnStartedEvent + + _emit_spawn_event( + NewAgentSpawnStartedEvent, + new_agent_id=parent_id, + spawn_id=spawn_id, + parent_id=parent_id, + spawn_depth=1, + ) + except Exception: + pass + + from crewai.new_agent.models import AgentSettings as SpawnSettings + + spawn_settings = SpawnSettings( + can_spawn_copies=False, + max_spawn_depth=0, + memory_enabled=True, + provenance_enabled=settings.provenance_enabled, + respect_context_window=settings.respect_context_window, + cache_tool_results=settings.cache_tool_results, + narration_guard=settings.narration_guard, + narration_max_retries=settings.narration_max_retries, + ) + + enriched_messages: list[str] = [] + for subtask in subtasks: + context = _query_parent_memory(self.agent, subtask) + if context: + enriched_messages.append(f"{context}\n\nTask: {subtask}") + else: + enriched_messages.append(subtask) + + copies: list[NewAgent] = [] + for subtask in subtasks: + copy = NewAgent( + role=self.agent.role, + goal=subtask, + backstory="", + llm=self.agent.llm, + tools=list(self.agent.tools), + memory=True, + memory_scope=f"spawn-{parent_id}", + settings=spawn_settings, + verbose=self.agent.verbose, + ) + copies.append(copy) + + if fire_and_forget: + for copy, msg, sid in zip(copies, enriched_messages, spawn_ids): + + async def _bg(c: NewAgent = copy, m: str = msg, s: str = sid) -> None: + try: + await c.amessage(m) + try: + from crewai.new_agent.events import ( + NewAgentSpawnCompletedEvent, + ) + + _emit_spawn_event( + NewAgentSpawnCompletedEvent, + new_agent_id=parent_id, + spawn_id=s, + ) + except Exception: + pass + except Exception as e: + try: + from crewai.new_agent.events import ( + NewAgentSpawnFailedEvent, + ) + + _emit_spawn_event( + NewAgentSpawnFailedEvent, + new_agent_id=parent_id, + spawn_id=s, + error=str(e), + ) + except Exception: + pass + + asyncio.get_running_loop().create_task(_bg()) + + return f"Dispatched {len(copies)} subtask(s) in the background (fire-and-forget)." + + tasks = [ + asyncio.wait_for(copy.amessage(msg), timeout=timeout) + for copy, msg in zip(copies, enriched_messages) + ] + raw_results = await asyncio.gather(*tasks, return_exceptions=True) + results: list[str] = [] + for i, r in enumerate(raw_results): + if isinstance(r, asyncio.TimeoutError): + results.append(f"[Subtask {i + 1}] Timed out after {timeout}s") + try: + from crewai.new_agent.events import NewAgentSpawnFailedEvent + + _emit_spawn_event( + NewAgentSpawnFailedEvent, + new_agent_id=parent_id, + spawn_id=spawn_ids[i], + error=f"Timed out after {timeout}s", + ) + except Exception: + pass + elif isinstance(r, Exception): + results.append(f"[Subtask {i + 1}] Error: {r}") + try: + from crewai.new_agent.events import NewAgentSpawnFailedEvent + + _emit_spawn_event( + NewAgentSpawnFailedEvent, + new_agent_id=parent_id, + spawn_id=spawn_ids[i], + error=str(r), + ) + except Exception: + pass + else: + results.append(f"[Subtask {i + 1}] {r.content}") + try: + from crewai.new_agent.events import NewAgentSpawnCompletedEvent + + _emit_spawn_event( + NewAgentSpawnCompletedEvent, + new_agent_id=parent_id, + spawn_id=spawn_ids[i], + ) + except Exception: + pass + + self._log_spawn_provenance(subtasks, results, spawn_ids) + return "\n\n".join(results) + + def _log_spawn_provenance( + self, subtasks: list[str], results: list[str], spawn_ids: list[str] + ) -> None: if self.agent.settings.provenance_enabled and hasattr(self.agent, "_executor"): from crewai.new_agent.models import ProvenanceEntry @@ -297,5 +464,3 @@ class SpawnSubtaskTool(BaseTool): outcome=result[:500], ) ) - - return "\n\n".join(results)