chore: refactor a2a agent kickoff delegation and event output

This commit is contained in:
Greyson LaLonde
2026-01-29 22:29:47 -05:00
parent f1b672167d
commit 7fa42d63a4
3 changed files with 397 additions and 133 deletions

View File

@@ -47,13 +47,13 @@ from crewai.events.types.a2a_events import (
A2AMessageSentEvent,
)
from crewai.lite_agent_output import LiteAgentOutput
from crewai.task import Task
if TYPE_CHECKING:
from a2a.types import AgentCard, Message
from crewai.agent.core import Agent
from crewai.task import Task
from crewai.tools.base_tool import BaseTool
@@ -186,53 +186,17 @@ def wrap_agent_with_a2a_instance(
if not a2a_agents:
return original_kickoff(self, messages, response_format, input_files)
if isinstance(messages, str):
description = messages
else:
content = next(
(m["content"] for m in reversed(messages) if m["role"] == "user"),
None,
)
description = content if isinstance(content, str) else ""
if not description:
return original_kickoff(self, messages, response_format, input_files)
fake_task = Task(
description=description,
agent=self,
expected_output="Result from A2A delegation",
input_files=input_files or {},
)
def task_to_kickoff_adapter(
self_: Any, task: Task, context: str | None, tools: list[Any] | None
) -> str:
"""Adapt execute_task signature to kickoff for delegation."""
result: LiteAgentOutput = original_kickoff(
self_, messages, response_format, input_files
)
return result.raw
result_str = _execute_task_with_a2a(
return _kickoff_with_a2a(
self=self,
a2a_agents=a2a_agents,
original_fn=task_to_kickoff_adapter,
task=fake_task,
original_kickoff=original_kickoff,
messages=messages,
response_format=response_format,
input_files=input_files,
agent_response_model=agent_response_model,
context=None,
tools=None,
extension_registry=extension_registry,
)
return LiteAgentOutput(
raw=result_str,
pydantic=None,
agent_role=self.role,
usage_metrics=None,
messages=[],
)
@wraps(original_kickoff_async)
async def kickoff_async_with_a2a(
self: Agent,
@@ -253,55 +217,17 @@ def wrap_agent_with_a2a_instance(
self, messages, response_format, input_files
)
if isinstance(messages, str):
description = messages
else:
content = next(
(m["content"] for m in reversed(messages) if m["role"] == "user"),
None,
)
description = content if isinstance(content, str) else ""
if not description:
return await original_kickoff_async(
self, messages, response_format, input_files
)
fake_task = Task(
description=description,
agent=self,
expected_output="Result from A2A delegation",
input_files=input_files or {},
)
async def task_to_kickoff_adapter(
self_: Any, task: Task, context: str | None, tools: list[Any] | None
) -> str:
"""Adapt execute_task signature to kickoff_async for delegation."""
result: LiteAgentOutput = await original_kickoff_async(
self_, messages, response_format, input_files
)
return result.raw
result_str = await _aexecute_task_with_a2a(
return await _akickoff_with_a2a(
self=self,
a2a_agents=a2a_agents,
original_fn=task_to_kickoff_adapter,
task=fake_task,
original_kickoff_async=original_kickoff_async,
messages=messages,
response_format=response_format,
input_files=input_files,
agent_response_model=agent_response_model,
context=None,
tools=None,
extension_registry=extension_registry,
)
return LiteAgentOutput(
raw=result_str,
pydantic=None,
agent_role=self.role,
usage_metrics=None,
messages=[],
)
object.__setattr__(agent, "kickoff", MethodType(kickoff_with_a2a, agent))
object.__setattr__(
agent, "kickoff_async", MethodType(kickoff_async_with_a2a, agent)
@@ -451,15 +377,263 @@ def _execute_task_with_a2a(
original_task_description=original_description,
_extension_registry=extension_registry,
)
return str(agent_response.message)
task.output_pydantic = None
return agent_response.message
return raw_result
finally:
task.description = original_description
task.output_pydantic = original_output_pydantic
if task.output_pydantic is not None:
task.output_pydantic = original_output_pydantic
task.response_model = original_response_model
def _kickoff_with_a2a(
self: Agent,
a2a_agents: list[A2AConfig | A2AClientConfig],
original_kickoff: Callable[..., LiteAgentOutput],
messages: str | list[Any],
response_format: type[Any] | None,
input_files: dict[str, Any] | None,
agent_response_model: type[BaseModel] | None,
extension_registry: ExtensionRegistry,
) -> LiteAgentOutput:
"""Execute kickoff with A2A delegation support (sync).
Args:
self: The agent instance.
a2a_agents: List of A2A agent configurations.
original_kickoff: The original kickoff method.
messages: Messages to send to the agent.
response_format: Optional response format.
input_files: Optional input files.
agent_response_model: Optional agent response model.
extension_registry: Registry of A2A extensions.
Returns:
LiteAgentOutput from kickoff or A2A delegation.
"""
if isinstance(messages, str):
description = messages
else:
content = next(
(m["content"] for m in reversed(messages) if m["role"] == "user"),
None,
)
description = content if isinstance(content, str) else ""
if not description:
return original_kickoff(self, messages, response_format, input_files)
fake_task = Task(
description=description,
agent=self,
expected_output="Result from A2A delegation",
input_files=input_files or {},
)
agent_cards, failed_agents = _fetch_agent_cards_concurrently(a2a_agents)
if not agent_cards and a2a_agents and failed_agents:
return original_kickoff(self, messages, response_format, input_files)
fake_task.description, _, extension_states = _augment_prompt_with_a2a(
a2a_agents=a2a_agents,
task_description=description,
agent_cards=agent_cards,
failed_agents=failed_agents,
extension_registry=extension_registry,
)
fake_task.response_model = agent_response_model
try:
result: LiteAgentOutput = original_kickoff(
self, messages, agent_response_model or response_format, input_files
)
agent_response = _parse_agent_response(
raw_result=result.raw, agent_response_model=agent_response_model
)
if extension_registry and isinstance(agent_response, BaseModel):
agent_response = extension_registry.process_response_with_all(
agent_response, extension_states
)
if isinstance(agent_response, BaseModel) and isinstance(
agent_response, AgentResponseProtocol
):
if agent_response.is_a2a:
def _kickoff_adapter(
self_: Agent,
_task: Task,
_context: str | None,
_tools: list[Any] | None,
) -> str:
fmt = (
_task.response_model or agent_response_model or response_format
)
output: LiteAgentOutput = original_kickoff(
self_, messages, fmt, input_files
)
return output.raw
result_str = _delegate_to_a2a(
self,
agent_response=agent_response,
task=fake_task,
original_fn=_kickoff_adapter,
context=None,
tools=None,
agent_cards=agent_cards,
original_task_description=description,
_extension_registry=extension_registry,
)
return LiteAgentOutput(
raw=result_str,
pydantic=None,
agent_role=self.role,
usage_metrics=None,
messages=[],
)
return LiteAgentOutput(
raw=agent_response.message,
pydantic=None,
agent_role=self.role,
usage_metrics=result.usage_metrics,
messages=result.messages,
)
return result
finally:
fake_task.description = description
async def _akickoff_with_a2a(
self: Agent,
a2a_agents: list[A2AConfig | A2AClientConfig],
original_kickoff_async: Callable[..., Coroutine[Any, Any, LiteAgentOutput]],
messages: str | list[Any],
response_format: type[Any] | None,
input_files: dict[str, Any] | None,
agent_response_model: type[BaseModel] | None,
extension_registry: ExtensionRegistry,
) -> LiteAgentOutput:
"""Execute kickoff with A2A delegation support (async).
Args:
self: The agent instance.
a2a_agents: List of A2A agent configurations.
original_kickoff_async: The original kickoff_async method.
messages: Messages to send to the agent.
response_format: Optional response format.
input_files: Optional input files.
agent_response_model: Optional agent response model.
extension_registry: Registry of A2A extensions.
Returns:
LiteAgentOutput from kickoff or A2A delegation.
"""
if isinstance(messages, str):
description = messages
else:
content = next(
(m["content"] for m in reversed(messages) if m["role"] == "user"),
None,
)
description = content if isinstance(content, str) else ""
if not description:
return await original_kickoff_async(
self, messages, response_format, input_files
)
fake_task = Task(
description=description,
agent=self,
expected_output="Result from A2A delegation",
input_files=input_files or {},
)
agent_cards, failed_agents = await _afetch_agent_cards_concurrently(a2a_agents)
if not agent_cards and a2a_agents and failed_agents:
return await original_kickoff_async(
self, messages, response_format, input_files
)
fake_task.description, _, extension_states = _augment_prompt_with_a2a(
a2a_agents=a2a_agents,
task_description=description,
agent_cards=agent_cards,
failed_agents=failed_agents,
extension_registry=extension_registry,
)
fake_task.response_model = agent_response_model
try:
result: LiteAgentOutput = await original_kickoff_async(
self, messages, agent_response_model or response_format, input_files
)
agent_response = _parse_agent_response(
raw_result=result.raw, agent_response_model=agent_response_model
)
if extension_registry and isinstance(agent_response, BaseModel):
agent_response = extension_registry.process_response_with_all(
agent_response, extension_states
)
if isinstance(agent_response, BaseModel) and isinstance(
agent_response, AgentResponseProtocol
):
if agent_response.is_a2a:
async def _kickoff_adapter(
self_: Agent,
_task: Task,
_context: str | None,
_tools: list[Any] | None,
) -> str:
fmt = (
_task.response_model or agent_response_model or response_format
)
output: LiteAgentOutput = await original_kickoff_async(
self_, messages, fmt, input_files
)
return output.raw
result_str = await _adelegate_to_a2a(
self,
agent_response=agent_response,
task=fake_task,
original_fn=_kickoff_adapter,
context=None,
tools=None,
agent_cards=agent_cards,
original_task_description=description,
_extension_registry=extension_registry,
)
return LiteAgentOutput(
raw=result_str,
pydantic=None,
agent_role=self.role,
usage_metrics=None,
messages=[],
)
return LiteAgentOutput(
raw=agent_response.message,
pydantic=None,
agent_role=self.role,
usage_metrics=result.usage_metrics,
messages=result.messages,
)
return result
finally:
fake_task.description = description
def _augment_prompt_with_a2a(
a2a_agents: list[A2AConfig | A2AClientConfig],
task_description: str,
@@ -764,8 +938,8 @@ def _process_response_result(
agent_card=agent_card,
),
)
return str(llm_response.message), None
return None, str(llm_response.message)
return llm_response.message, None
return None, llm_response.message
return str(raw_result), None
@@ -1342,12 +1516,14 @@ async def _aexecute_task_with_a2a(
original_task_description=original_description,
_extension_registry=extension_registry,
)
return str(agent_response.message)
task.output_pydantic = None
return agent_response.message
return raw_result
finally:
task.description = original_description
task.output_pydantic = original_output_pydantic
if task.output_pydantic is not None:
task.output_pydantic = original_output_pydantic
task.response_model = original_response_model

View File

@@ -94,6 +94,12 @@ from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
try:
from crewai.a2a.types import AgentResponseProtocol
except ImportError:
AgentResponseProtocol = None # type: ignore[assignment, misc]
if TYPE_CHECKING:
from crewai_files import FileInput
from crewai_tools import CodeInterpreterTool
@@ -490,9 +496,22 @@ class Agent(BaseAgent):
self._rpm_controller.stop_rpm_counter()
result = process_tool_results(self, result)
output_for_event = result
if (
AgentResponseProtocol is not None
and isinstance(result, BaseModel)
and isinstance(result, AgentResponseProtocol)
):
output_for_event = str(result.message)
elif not isinstance(result, str):
output_for_event = str(result)
crewai_event_bus.emit(
self,
event=AgentExecutionCompletedEvent(agent=self, task=task, output=result),
event=AgentExecutionCompletedEvent(
agent=self, task=task, output=output_for_event
),
)
save_last_messages(self)
@@ -709,9 +728,22 @@ class Agent(BaseAgent):
self._rpm_controller.stop_rpm_counter()
result = process_tool_results(self, result)
output_for_event = result
if (
AgentResponseProtocol is not None
and isinstance(result, BaseModel)
and isinstance(result, AgentResponseProtocol)
):
output_for_event = str(result.message)
elif not isinstance(result, str):
output_for_event = str(result)
crewai_event_bus.emit(
self,
event=AgentExecutionCompletedEvent(agent=self, task=task, output=result),
event=AgentExecutionCompletedEvent(
agent=self, task=task, output=output_for_event
),
)
save_last_messages(self)
@@ -1863,25 +1895,30 @@ class Agent(BaseAgent):
# Handle response format conversion
formatted_result: BaseModel | None = None
if response_format:
try:
model_schema = generate_model_description(response_format)
schema = json.dumps(model_schema, indent=2)
instructions = self.i18n.slice("formatted_task_instructions").format(
output_format=schema
)
if isinstance(raw_output, BaseModel) and isinstance(
raw_output, response_format
):
formatted_result = raw_output
elif isinstance(raw_output, str):
try:
model_schema = generate_model_description(response_format)
schema = json.dumps(model_schema, indent=2)
instructions = self.i18n.slice(
"formatted_task_instructions"
).format(output_format=schema)
converter = Converter(
llm=self.llm,
text=raw_output,
model=response_format,
instructions=instructions,
)
converter = Converter(
llm=self.llm,
text=raw_output,
model=response_format,
instructions=instructions,
)
conversion_result = converter.to_pydantic()
if isinstance(conversion_result, BaseModel):
formatted_result = conversion_result
except ConverterError:
pass # Keep raw output if conversion fails
conversion_result = converter.to_pydantic()
if isinstance(conversion_result, BaseModel):
formatted_result = conversion_result
except ConverterError:
pass # Keep raw output if conversion fails
# Get token usage metrics
if isinstance(self.llm, BaseLLM):
@@ -1889,8 +1926,16 @@ class Agent(BaseAgent):
else:
usage_metrics = self._token_process.get_summary()
raw_str = (
raw_output
if isinstance(raw_output, str)
else raw_output.model_dump_json()
if isinstance(raw_output, BaseModel)
else str(raw_output)
)
return LiteAgentOutput(
raw=raw_output,
raw=raw_str,
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,
@@ -1925,25 +1970,30 @@ class Agent(BaseAgent):
# Handle response format conversion
formatted_result: BaseModel | None = None
if response_format:
try:
model_schema = generate_model_description(response_format)
schema = json.dumps(model_schema, indent=2)
instructions = self.i18n.slice("formatted_task_instructions").format(
output_format=schema
)
if isinstance(raw_output, BaseModel) and isinstance(
raw_output, response_format
):
formatted_result = raw_output
elif isinstance(raw_output, str):
try:
model_schema = generate_model_description(response_format)
schema = json.dumps(model_schema, indent=2)
instructions = self.i18n.slice(
"formatted_task_instructions"
).format(output_format=schema)
converter = Converter(
llm=self.llm,
text=raw_output,
model=response_format,
instructions=instructions,
)
converter = Converter(
llm=self.llm,
text=raw_output,
model=response_format,
instructions=instructions,
)
conversion_result = converter.to_pydantic()
if isinstance(conversion_result, BaseModel):
formatted_result = conversion_result
except ConverterError:
pass # Keep raw output if conversion fails
conversion_result = converter.to_pydantic()
if isinstance(conversion_result, BaseModel):
formatted_result = conversion_result
except ConverterError:
pass # Keep raw output if conversion fails
# Get token usage metrics
if isinstance(self.llm, BaseLLM):
@@ -1951,8 +2001,16 @@ class Agent(BaseAgent):
else:
usage_metrics = self._token_process.get_summary()
raw_str = (
raw_output
if isinstance(raw_output, str)
else raw_output.model_dump_json()
if isinstance(raw_output, BaseModel)
else str(raw_output)
)
return LiteAgentOutput(
raw=raw_output,
raw=raw_str,
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,

View File

@@ -365,11 +365,20 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
printer=self._printer,
from_task=self.task,
from_agent=self.agent,
response_model=None,
response_model=self.response_model,
executor_context=self,
verbose=self.agent.verbose,
)
# If response is structured output (BaseModel), store it directly
if isinstance(answer, BaseModel):
self.state.current_answer = AgentFinish(
thought="",
output=answer,
text=str(answer),
)
return "parsed"
# Parse the LLM response
formatted_answer = process_llm_response(answer, self.use_stop_words)
@@ -436,7 +445,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
available_functions=None,
from_task=self.task,
from_agent=self.agent,
response_model=None,
response_model=self.response_model,
executor_context=self,
verbose=self.agent.verbose,
)
@@ -448,6 +457,17 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
return "native_tool_calls"
# Structured output (BaseModel) response
if isinstance(answer, BaseModel):
self.state.current_answer = AgentFinish(
thought="",
output=answer,
text=str(answer),
)
self._invoke_step_callback(self.state.current_answer)
self._append_message_to_state(str(answer))
return "native_finished"
# Text response - this is the final answer
if isinstance(answer, str):
self.state.current_answer = AgentFinish(
@@ -1300,7 +1320,12 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
Returns:
Final answer after feedback.
"""
human_feedback = self._ask_human_input(formatted_answer.output)
output_str = (
str(formatted_answer.output)
if isinstance(formatted_answer.output, BaseModel)
else formatted_answer.output
)
human_feedback = self._ask_human_input(output_str)
if self._is_training_mode():
return self._handle_training_feedback(formatted_answer, human_feedback)
@@ -1372,7 +1397,12 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.ask_for_human_input = False
else:
answer = self._process_feedback_iteration(feedback)
feedback = self._ask_human_input(answer.output)
output_str = (
str(answer.output)
if isinstance(answer.output, BaseModel)
else answer.output
)
feedback = self._ask_human_input(output_str)
return answer