feat: liteagent a2a delegation support to kickoff methods

This commit is contained in:
Greyson LaLonde
2026-01-29 19:41:36 -05:00
parent 8cf483712b
commit f1b672167d

View File

@@ -46,6 +46,7 @@ from crewai.events.types.a2a_events import (
A2AConversationCompletedEvent,
A2AMessageSentEvent,
)
from crewai.lite_agent_output import LiteAgentOutput
if TYPE_CHECKING:
@@ -95,11 +96,11 @@ class DelegationState(NamedTuple):
def wrap_agent_with_a2a_instance(
agent: Agent, extension_registry: ExtensionRegistry | None = None
) -> None:
"""Wrap an agent instance's execute_task and aexecute_task methods with A2A support.
"""Wrap an agent instance's task execution and kickoff methods with A2A support.
This function modifies the agent instance by wrapping its execute_task
and aexecute_task methods to add A2A delegation capabilities. Should only
be called when the agent has a2a configuration set.
This function modifies the agent instance by wrapping its execute_task,
aexecute_task, kickoff, and kickoff_async methods to add A2A delegation
capabilities. Should only be called when the agent has a2a configuration set.
Args:
agent: The agent instance to wrap.
@@ -166,6 +167,146 @@ def wrap_agent_with_a2a_instance(
agent, "aexecute_task", MethodType(aexecute_task_with_a2a, agent)
)
original_kickoff = agent.kickoff.__func__ # type: ignore[attr-defined]
original_kickoff_async = agent.kickoff_async.__func__ # type: ignore[attr-defined]
@wraps(original_kickoff)
def kickoff_with_a2a(
self: Agent,
messages: str | list[Any],
response_format: type[Any] | None = None,
input_files: dict[str, Any] | None = None,
) -> Any:
"""Execute agent kickoff with A2A delegation support."""
if not self.a2a:
return original_kickoff(self, messages, response_format, input_files)
a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a)
if not a2a_agents:
return original_kickoff(self, messages, response_format, input_files)
if isinstance(messages, str):
description = messages
else:
content = next(
(m["content"] for m in reversed(messages) if m["role"] == "user"),
None,
)
description = content if isinstance(content, str) else ""
if not description:
return original_kickoff(self, messages, response_format, input_files)
fake_task = Task(
description=description,
agent=self,
expected_output="Result from A2A delegation",
input_files=input_files or {},
)
def task_to_kickoff_adapter(
self_: Any, task: Task, context: str | None, tools: list[Any] | None
) -> str:
"""Adapt execute_task signature to kickoff for delegation."""
result: LiteAgentOutput = original_kickoff(
self_, messages, response_format, input_files
)
return result.raw
result_str = _execute_task_with_a2a(
self=self,
a2a_agents=a2a_agents,
original_fn=task_to_kickoff_adapter,
task=fake_task,
agent_response_model=agent_response_model,
context=None,
tools=None,
extension_registry=extension_registry,
)
return LiteAgentOutput(
raw=result_str,
pydantic=None,
agent_role=self.role,
usage_metrics=None,
messages=[],
)
@wraps(original_kickoff_async)
async def kickoff_async_with_a2a(
self: Agent,
messages: str | list[Any],
response_format: type[Any] | None = None,
input_files: dict[str, Any] | None = None,
) -> Any:
"""Execute agent kickoff with A2A delegation support."""
if not self.a2a:
return await original_kickoff_async(
self, messages, response_format, input_files
)
a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a)
if not a2a_agents:
return await original_kickoff_async(
self, messages, response_format, input_files
)
if isinstance(messages, str):
description = messages
else:
content = next(
(m["content"] for m in reversed(messages) if m["role"] == "user"),
None,
)
description = content if isinstance(content, str) else ""
if not description:
return await original_kickoff_async(
self, messages, response_format, input_files
)
fake_task = Task(
description=description,
agent=self,
expected_output="Result from A2A delegation",
input_files=input_files or {},
)
async def task_to_kickoff_adapter(
self_: Any, task: Task, context: str | None, tools: list[Any] | None
) -> str:
"""Adapt execute_task signature to kickoff_async for delegation."""
result: LiteAgentOutput = await original_kickoff_async(
self_, messages, response_format, input_files
)
return result.raw
result_str = await _aexecute_task_with_a2a(
self=self,
a2a_agents=a2a_agents,
original_fn=task_to_kickoff_adapter,
task=fake_task,
agent_response_model=agent_response_model,
context=None,
tools=None,
extension_registry=extension_registry,
)
return LiteAgentOutput(
raw=result_str,
pydantic=None,
agent_role=self.role,
usage_metrics=None,
messages=[],
)
object.__setattr__(agent, "kickoff", MethodType(kickoff_with_a2a, agent))
object.__setattr__(
agent, "kickoff_async", MethodType(kickoff_async_with_a2a, agent)
)
inject_a2a_server_methods(agent)