diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py index 0eb8c503b..c87e88de7 100644 --- a/lib/crewai/src/crewai/a2a/wrapper.py +++ b/lib/crewai/src/crewai/a2a/wrapper.py @@ -46,6 +46,7 @@ from crewai.events.types.a2a_events import ( A2AConversationCompletedEvent, A2AMessageSentEvent, ) +from crewai.lite_agent_output import LiteAgentOutput if TYPE_CHECKING: @@ -95,11 +96,11 @@ class DelegationState(NamedTuple): def wrap_agent_with_a2a_instance( agent: Agent, extension_registry: ExtensionRegistry | None = None ) -> None: - """Wrap an agent instance's execute_task and aexecute_task methods with A2A support. + """Wrap an agent instance's task execution and kickoff methods with A2A support. - This function modifies the agent instance by wrapping its execute_task - and aexecute_task methods to add A2A delegation capabilities. Should only - be called when the agent has a2a configuration set. + This function modifies the agent instance by wrapping its execute_task, + aexecute_task, kickoff, and kickoff_async methods to add A2A delegation + capabilities. Should only be called when the agent has a2a configuration set. Args: agent: The agent instance to wrap. @@ -166,6 +167,146 @@ def wrap_agent_with_a2a_instance( agent, "aexecute_task", MethodType(aexecute_task_with_a2a, agent) ) + original_kickoff = agent.kickoff.__func__ # type: ignore[attr-defined] + original_kickoff_async = agent.kickoff_async.__func__ # type: ignore[attr-defined] + + @wraps(original_kickoff) + def kickoff_with_a2a( + self: Agent, + messages: str | list[Any], + response_format: type[Any] | None = None, + input_files: dict[str, Any] | None = None, + ) -> Any: + """Execute agent kickoff with A2A delegation support.""" + if not self.a2a: + return original_kickoff(self, messages, response_format, input_files) + + a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a) + + if not a2a_agents: + return original_kickoff(self, messages, response_format, input_files) + + if isinstance(messages, str): + description = messages + else: + content = next( + (m["content"] for m in reversed(messages) if m["role"] == "user"), + None, + ) + description = content if isinstance(content, str) else "" + + if not description: + return original_kickoff(self, messages, response_format, input_files) + + fake_task = Task( + description=description, + agent=self, + expected_output="Result from A2A delegation", + input_files=input_files or {}, + ) + + def task_to_kickoff_adapter( + self_: Any, task: Task, context: str | None, tools: list[Any] | None + ) -> str: + """Adapt execute_task signature to kickoff for delegation.""" + result: LiteAgentOutput = original_kickoff( + self_, messages, response_format, input_files + ) + return result.raw + + result_str = _execute_task_with_a2a( + self=self, + a2a_agents=a2a_agents, + original_fn=task_to_kickoff_adapter, + task=fake_task, + agent_response_model=agent_response_model, + context=None, + tools=None, + extension_registry=extension_registry, + ) + + return LiteAgentOutput( + raw=result_str, + pydantic=None, + agent_role=self.role, + usage_metrics=None, + messages=[], + ) + + @wraps(original_kickoff_async) + async def kickoff_async_with_a2a( + self: Agent, + messages: str | list[Any], + response_format: type[Any] | None = None, + input_files: dict[str, Any] | None = None, + ) -> Any: + """Execute agent kickoff with A2A delegation support.""" + if not self.a2a: + return await original_kickoff_async( + self, messages, response_format, input_files + ) + + a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a) + + if not a2a_agents: + return await original_kickoff_async( + self, messages, response_format, input_files + ) + + if isinstance(messages, str): + description = messages + else: + content = next( + (m["content"] for m in reversed(messages) if m["role"] == "user"), + None, + ) + description = content if isinstance(content, str) else "" + + if not description: + return await original_kickoff_async( + self, messages, response_format, input_files + ) + + fake_task = Task( + description=description, + agent=self, + expected_output="Result from A2A delegation", + input_files=input_files or {}, + ) + + async def task_to_kickoff_adapter( + self_: Any, task: Task, context: str | None, tools: list[Any] | None + ) -> str: + """Adapt execute_task signature to kickoff_async for delegation.""" + result: LiteAgentOutput = await original_kickoff_async( + self_, messages, response_format, input_files + ) + return result.raw + + result_str = await _aexecute_task_with_a2a( + self=self, + a2a_agents=a2a_agents, + original_fn=task_to_kickoff_adapter, + task=fake_task, + agent_response_model=agent_response_model, + context=None, + tools=None, + extension_registry=extension_registry, + ) + + return LiteAgentOutput( + raw=result_str, + pydantic=None, + agent_role=self.role, + usage_metrics=None, + messages=[], + ) + + object.__setattr__(agent, "kickoff", MethodType(kickoff_with_a2a, agent)) + object.__setattr__( + agent, "kickoff_async", MethodType(kickoff_async_with_a2a, agent) + ) + inject_a2a_server_methods(agent)