diff --git a/lib/crewai/src/crewai/new_agent/coworker_tools.py b/lib/crewai/src/crewai/new_agent/coworker_tools.py index 641e69963..c9dcbde8a 100644 --- a/lib/crewai/src/crewai/new_agent/coworker_tools.py +++ b/lib/crewai/src/crewai/new_agent/coworker_tools.py @@ -244,6 +244,106 @@ class DelegateToCoworkerTool(BaseTool): ) raise + async def _arun(self, message: str, fire_and_forget: bool = False, **kwargs: Any) -> str: + """Async delegation — avoids blocking the event loop.""" + from crewai.new_agent.events import ( + NewAgentDelegationCompletedEvent, + NewAgentDelegationFailedEvent, + NewAgentDelegationStartedEvent, + NewAgentFireAndForgetCompletedEvent, + NewAgentFireAndForgetDispatchedEvent, + ) + from crewai.new_agent.new_agent import NewAgent + + cw_role = getattr(self.coworker, "role", "unknown") + parent_id = getattr(self.parent_agent, "id", "") if self.parent_agent else "" + + if self.parent_agent and getattr(self.parent_agent, "on_delegate", None): + self.parent_agent.on_delegate(self.coworker, message) + + if not isinstance(self.coworker, NewAgent): + return self._delegate_a2a(message) + + if fire_and_forget: + _emit_delegation_event( + NewAgentFireAndForgetDispatchedEvent, + new_agent_id=parent_id, + coworker_role=cw_role, + ) + + async def _async_ff() -> None: + try: + await self.coworker.amessage(message) + finally: + _emit_delegation_event( + NewAgentFireAndForgetCompletedEvent, + new_agent_id=parent_id, + coworker_role=cw_role, + ) + + asyncio.get_running_loop().create_task(_async_ff()) + return f"Work delegated to {cw_role}. They are working on it in the background." + + _emit_delegation_event( + NewAgentDelegationStartedEvent, + new_agent_id=parent_id, + coworker_role=cw_role, + delegation_mode="sync", + coworker_source=self.coworker_source, + ) + + start = time.monotonic() + try: + response = await self.coworker.amessage(message) + elapsed_ms = int((time.monotonic() - start) * 1000) + in_tokens = getattr(response, "input_tokens", 0) or 0 + out_tokens = getattr(response, "output_tokens", 0) or 0 + tokens = in_tokens + out_tokens + _emit_delegation_event( + NewAgentDelegationCompletedEvent, + new_agent_id=parent_id, + coworker_role=cw_role, + tokens_consumed=tokens, + response_time_ms=elapsed_ms, + ) + + if self.parent_agent and tokens > 0: + try: + from crewai.new_agent.models import TokenUsage + + executor = getattr(self.parent_agent, "_executor", None) + if executor is not None: + executor._sub_action_tokens.append( + TokenUsage( + action="delegation", + agent_id=str(parent_id), + input_tokens=in_tokens, + output_tokens=out_tokens, + model=getattr(response, "model", "") or "", + delegation_target=cw_role, + coworker_source=self.coworker_source, + ) + ) + except Exception: + pass + + result_content = response.content + summary = _build_provenance_summary( + self.coworker, cw_role, elapsed_ms, in_tokens, out_tokens + ) + if summary: + result_content += summary + + return result_content + except Exception as e: + _emit_delegation_event( + NewAgentDelegationFailedEvent, + new_agent_id=parent_id, + coworker_role=cw_role, + error=str(e), + ) + raise + def _delegate_a2a(self, message: str) -> str: """Delegate to an A2A remote coworker.""" try: @@ -350,6 +450,58 @@ class MultiDelegateTool(BaseTool): return "\n\n".join(results) + async def _arun(self, delegations: list[dict[str, str]], **kwargs: Any) -> str: + """Async parallel delegation — avoids blocking the event loop.""" + from crewai.new_agent.new_agent import NewAgent + + tasks_to_run = [] + for d in delegations: + cw_name = d.get("coworker", "") + message = d.get("message", "") + coworker = self.coworker_map.get(cw_name) + if coworker is None: + for role, cw in self.coworker_map.items(): + if cw_name.lower() in role.lower(): + coworker = cw + break + if coworker is None or not isinstance(coworker, NewAgent): + tasks_to_run.append((cw_name, message, None)) + else: + tasks_to_run.append((cw_name, message, coworker)) + + async def _error_result(name: str) -> str: + return f"[Error] Coworker '{name}' not found." + + coros = [] + for cw_name, message, coworker in tasks_to_run: + if coworker is None: + coros.append(_error_result(cw_name)) + else: + coros.append(coworker.amessage(message)) + raw = await asyncio.gather(*coros, return_exceptions=True) + + results: list[str] = [] + for i, (cw_name, message, coworker) in enumerate(tasks_to_run): + r = raw[i] + if isinstance(r, Exception): + results.append(f"[{cw_name}] Error: {r}") + elif isinstance(r, str): + results.append(f"[{cw_name}] {r}") + else: + content = getattr(r, "content", str(r)) + role = cw_name or f"Coworker {i + 1}" + in_tokens = getattr(r, "input_tokens", 0) or 0 + out_tokens = getattr(r, "output_tokens", 0) or 0 + if coworker is not None: + summary = _build_provenance_summary( + coworker, role, 0, in_tokens, out_tokens + ) + if summary: + content += summary + results.append(f"[{role}] {content}") + + return "\n\n".join(results) + def build_coworker_tools( coworkers: list[Any], diff --git a/lib/crewai/src/crewai/new_agent/executor.py b/lib/crewai/src/crewai/new_agent/executor.py index 0809c2dab..d9aaa6565 100644 --- a/lib/crewai/src/crewai/new_agent/executor.py +++ b/lib/crewai/src/crewai/new_agent/executor.py @@ -1926,6 +1926,11 @@ class ConversationalAgentExecutor(BaseModel): result = parse_error.get( "result", f"Error parsing args for {func_name}" ) + elif original_tool and self._tool_has_arun(original_tool): + if isinstance(parsed_args, dict): + result = await original_tool._arun(**parsed_args) + else: + result = await original_tool._arun(parsed_args) elif isinstance(parsed_args, dict): result = ( original_tool._run(**parsed_args) @@ -2080,6 +2085,19 @@ class ConversationalAgentExecutor(BaseModel): return None + @staticmethod + def _tool_has_arun(tool: Any) -> bool: + """Check if a tool has a real async _arun (not the default NotImplementedError stub).""" + arun = getattr(tool, "_arun", None) + if arun is None: + return False + # BaseTool's default _arun raises NotImplementedError — skip it + for cls in type(tool).__mro__: + if "_arun" in cls.__dict__: + return cls.__name__ != "BaseTool" and cls.__name__ != "StructuredTool" + + return False + def _parse_tool_call(self, tool_call: Any) -> tuple[str | None, Any, str | None]: """Parse a tool call into (func_name, args, call_id).""" if hasattr(tool_call, "function"): diff --git a/lib/crewai/src/crewai/new_agent/spawn_tools.py b/lib/crewai/src/crewai/new_agent/spawn_tools.py index 360ebfc13..3eac5a996 100644 --- a/lib/crewai/src/crewai/new_agent/spawn_tools.py +++ b/lib/crewai/src/crewai/new_agent/spawn_tools.py @@ -277,7 +277,174 @@ class SpawnSubtaskTool(BaseTool): else: results = asyncio.run(_run_all()) - # Log provenance for each spawn + self._log_spawn_provenance(subtasks, results, spawn_ids) + return "\n\n".join(results) + + async def _arun( + self, subtasks: list[str], fire_and_forget: bool = False, **kwargs: Any + ) -> str: + """Async spawn — avoids blocking the event loop.""" + from crewai.new_agent.new_agent import NewAgent + + if not isinstance(self.agent, NewAgent): + return "Error: spawn tool requires a NewAgent instance." + + if not self.agent.settings.can_spawn_copies: + return "Error: this agent is not allowed to spawn copies (can_spawn_copies=False)." + + if self.agent.settings.max_spawn_depth < 1: + return "Error: spawn depth exceeded — copies cannot spawn further copies." + + settings = self.agent.settings + max_spawns = settings.max_concurrent_spawns + timeout = settings.spawn_timeout + parent_id = str(self.agent.id) + + if len(subtasks) > max_spawns: + subtasks = subtasks[:max_spawns] + + spawn_ids: list[str] = [] + for i, subtask in enumerate(subtasks): + spawn_id = f"spawn-{uuid4().hex[:8]}-{i + 1}" + spawn_ids.append(spawn_id) + try: + from crewai.new_agent.events import NewAgentSpawnStartedEvent + + _emit_spawn_event( + NewAgentSpawnStartedEvent, + new_agent_id=parent_id, + spawn_id=spawn_id, + parent_id=parent_id, + spawn_depth=1, + ) + except Exception: + pass + + from crewai.new_agent.models import AgentSettings as SpawnSettings + + spawn_settings = SpawnSettings( + can_spawn_copies=False, + max_spawn_depth=0, + memory_enabled=True, + provenance_enabled=settings.provenance_enabled, + respect_context_window=settings.respect_context_window, + cache_tool_results=settings.cache_tool_results, + narration_guard=settings.narration_guard, + narration_max_retries=settings.narration_max_retries, + ) + + enriched_messages: list[str] = [] + for subtask in subtasks: + context = _query_parent_memory(self.agent, subtask) + if context: + enriched_messages.append(f"{context}\n\nTask: {subtask}") + else: + enriched_messages.append(subtask) + + copies: list[NewAgent] = [] + for subtask in subtasks: + copy = NewAgent( + role=self.agent.role, + goal=subtask, + backstory="", + llm=self.agent.llm, + tools=list(self.agent.tools), + memory=True, + memory_scope=f"spawn-{parent_id}", + settings=spawn_settings, + verbose=self.agent.verbose, + ) + copies.append(copy) + + if fire_and_forget: + for copy, msg, sid in zip(copies, enriched_messages, spawn_ids): + + async def _bg(c: NewAgent = copy, m: str = msg, s: str = sid) -> None: + try: + await c.amessage(m) + try: + from crewai.new_agent.events import ( + NewAgentSpawnCompletedEvent, + ) + + _emit_spawn_event( + NewAgentSpawnCompletedEvent, + new_agent_id=parent_id, + spawn_id=s, + ) + except Exception: + pass + except Exception as e: + try: + from crewai.new_agent.events import ( + NewAgentSpawnFailedEvent, + ) + + _emit_spawn_event( + NewAgentSpawnFailedEvent, + new_agent_id=parent_id, + spawn_id=s, + error=str(e), + ) + except Exception: + pass + + asyncio.get_running_loop().create_task(_bg()) + + return f"Dispatched {len(copies)} subtask(s) in the background (fire-and-forget)." + + tasks = [ + asyncio.wait_for(copy.amessage(msg), timeout=timeout) + for copy, msg in zip(copies, enriched_messages) + ] + raw_results = await asyncio.gather(*tasks, return_exceptions=True) + results: list[str] = [] + for i, r in enumerate(raw_results): + if isinstance(r, asyncio.TimeoutError): + results.append(f"[Subtask {i + 1}] Timed out after {timeout}s") + try: + from crewai.new_agent.events import NewAgentSpawnFailedEvent + + _emit_spawn_event( + NewAgentSpawnFailedEvent, + new_agent_id=parent_id, + spawn_id=spawn_ids[i], + error=f"Timed out after {timeout}s", + ) + except Exception: + pass + elif isinstance(r, Exception): + results.append(f"[Subtask {i + 1}] Error: {r}") + try: + from crewai.new_agent.events import NewAgentSpawnFailedEvent + + _emit_spawn_event( + NewAgentSpawnFailedEvent, + new_agent_id=parent_id, + spawn_id=spawn_ids[i], + error=str(r), + ) + except Exception: + pass + else: + results.append(f"[Subtask {i + 1}] {r.content}") + try: + from crewai.new_agent.events import NewAgentSpawnCompletedEvent + + _emit_spawn_event( + NewAgentSpawnCompletedEvent, + new_agent_id=parent_id, + spawn_id=spawn_ids[i], + ) + except Exception: + pass + + self._log_spawn_provenance(subtasks, results, spawn_ids) + return "\n\n".join(results) + + def _log_spawn_provenance( + self, subtasks: list[str], results: list[str], spawn_ids: list[str] + ) -> None: if self.agent.settings.provenance_enabled and hasattr(self.agent, "_executor"): from crewai.new_agent.models import ProvenanceEntry @@ -297,5 +464,3 @@ class SpawnSubtaskTool(BaseTool): outcome=result[:500], ) ) - - return "\n\n".join(results)