diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py index c87e88de7..307ba0c90 100644 --- a/lib/crewai/src/crewai/a2a/wrapper.py +++ b/lib/crewai/src/crewai/a2a/wrapper.py @@ -47,13 +47,13 @@ from crewai.events.types.a2a_events import ( A2AMessageSentEvent, ) from crewai.lite_agent_output import LiteAgentOutput +from crewai.task import Task if TYPE_CHECKING: from a2a.types import AgentCard, Message from crewai.agent.core import Agent - from crewai.task import Task from crewai.tools.base_tool import BaseTool @@ -186,53 +186,17 @@ def wrap_agent_with_a2a_instance( if not a2a_agents: return original_kickoff(self, messages, response_format, input_files) - if isinstance(messages, str): - description = messages - else: - content = next( - (m["content"] for m in reversed(messages) if m["role"] == "user"), - None, - ) - description = content if isinstance(content, str) else "" - - if not description: - return original_kickoff(self, messages, response_format, input_files) - - fake_task = Task( - description=description, - agent=self, - expected_output="Result from A2A delegation", - input_files=input_files or {}, - ) - - def task_to_kickoff_adapter( - self_: Any, task: Task, context: str | None, tools: list[Any] | None - ) -> str: - """Adapt execute_task signature to kickoff for delegation.""" - result: LiteAgentOutput = original_kickoff( - self_, messages, response_format, input_files - ) - return result.raw - - result_str = _execute_task_with_a2a( + return _kickoff_with_a2a( self=self, a2a_agents=a2a_agents, - original_fn=task_to_kickoff_adapter, - task=fake_task, + original_kickoff=original_kickoff, + messages=messages, + response_format=response_format, + input_files=input_files, agent_response_model=agent_response_model, - context=None, - tools=None, extension_registry=extension_registry, ) - return LiteAgentOutput( - raw=result_str, - pydantic=None, - agent_role=self.role, - usage_metrics=None, - messages=[], - ) - @wraps(original_kickoff_async) async def kickoff_async_with_a2a( self: Agent, @@ -253,55 +217,17 @@ def wrap_agent_with_a2a_instance( self, messages, response_format, input_files ) - if isinstance(messages, str): - description = messages - else: - content = next( - (m["content"] for m in reversed(messages) if m["role"] == "user"), - None, - ) - description = content if isinstance(content, str) else "" - - if not description: - return await original_kickoff_async( - self, messages, response_format, input_files - ) - - fake_task = Task( - description=description, - agent=self, - expected_output="Result from A2A delegation", - input_files=input_files or {}, - ) - - async def task_to_kickoff_adapter( - self_: Any, task: Task, context: str | None, tools: list[Any] | None - ) -> str: - """Adapt execute_task signature to kickoff_async for delegation.""" - result: LiteAgentOutput = await original_kickoff_async( - self_, messages, response_format, input_files - ) - return result.raw - - result_str = await _aexecute_task_with_a2a( + return await _akickoff_with_a2a( self=self, a2a_agents=a2a_agents, - original_fn=task_to_kickoff_adapter, - task=fake_task, + original_kickoff_async=original_kickoff_async, + messages=messages, + response_format=response_format, + input_files=input_files, agent_response_model=agent_response_model, - context=None, - tools=None, extension_registry=extension_registry, ) - return LiteAgentOutput( - raw=result_str, - pydantic=None, - agent_role=self.role, - usage_metrics=None, - messages=[], - ) - object.__setattr__(agent, "kickoff", MethodType(kickoff_with_a2a, agent)) object.__setattr__( agent, "kickoff_async", MethodType(kickoff_async_with_a2a, agent) @@ -451,15 +377,263 @@ def _execute_task_with_a2a( original_task_description=original_description, _extension_registry=extension_registry, ) - return str(agent_response.message) + task.output_pydantic = None + return agent_response.message return raw_result finally: task.description = original_description - task.output_pydantic = original_output_pydantic + if task.output_pydantic is not None: + task.output_pydantic = original_output_pydantic task.response_model = original_response_model +def _kickoff_with_a2a( + self: Agent, + a2a_agents: list[A2AConfig | A2AClientConfig], + original_kickoff: Callable[..., LiteAgentOutput], + messages: str | list[Any], + response_format: type[Any] | None, + input_files: dict[str, Any] | None, + agent_response_model: type[BaseModel] | None, + extension_registry: ExtensionRegistry, +) -> LiteAgentOutput: + """Execute kickoff with A2A delegation support (sync). + + Args: + self: The agent instance. + a2a_agents: List of A2A agent configurations. + original_kickoff: The original kickoff method. + messages: Messages to send to the agent. + response_format: Optional response format. + input_files: Optional input files. + agent_response_model: Optional agent response model. + extension_registry: Registry of A2A extensions. + + Returns: + LiteAgentOutput from kickoff or A2A delegation. + """ + if isinstance(messages, str): + description = messages + else: + content = next( + (m["content"] for m in reversed(messages) if m["role"] == "user"), + None, + ) + description = content if isinstance(content, str) else "" + + if not description: + return original_kickoff(self, messages, response_format, input_files) + + fake_task = Task( + description=description, + agent=self, + expected_output="Result from A2A delegation", + input_files=input_files or {}, + ) + + agent_cards, failed_agents = _fetch_agent_cards_concurrently(a2a_agents) + + if not agent_cards and a2a_agents and failed_agents: + return original_kickoff(self, messages, response_format, input_files) + + fake_task.description, _, extension_states = _augment_prompt_with_a2a( + a2a_agents=a2a_agents, + task_description=description, + agent_cards=agent_cards, + failed_agents=failed_agents, + extension_registry=extension_registry, + ) + fake_task.response_model = agent_response_model + + try: + result: LiteAgentOutput = original_kickoff( + self, messages, agent_response_model or response_format, input_files + ) + agent_response = _parse_agent_response( + raw_result=result.raw, agent_response_model=agent_response_model + ) + + if extension_registry and isinstance(agent_response, BaseModel): + agent_response = extension_registry.process_response_with_all( + agent_response, extension_states + ) + + if isinstance(agent_response, BaseModel) and isinstance( + agent_response, AgentResponseProtocol + ): + if agent_response.is_a2a: + + def _kickoff_adapter( + self_: Agent, + _task: Task, + _context: str | None, + _tools: list[Any] | None, + ) -> str: + fmt = ( + _task.response_model or agent_response_model or response_format + ) + output: LiteAgentOutput = original_kickoff( + self_, messages, fmt, input_files + ) + return output.raw + + result_str = _delegate_to_a2a( + self, + agent_response=agent_response, + task=fake_task, + original_fn=_kickoff_adapter, + context=None, + tools=None, + agent_cards=agent_cards, + original_task_description=description, + _extension_registry=extension_registry, + ) + return LiteAgentOutput( + raw=result_str, + pydantic=None, + agent_role=self.role, + usage_metrics=None, + messages=[], + ) + return LiteAgentOutput( + raw=agent_response.message, + pydantic=None, + agent_role=self.role, + usage_metrics=result.usage_metrics, + messages=result.messages, + ) + + return result + finally: + fake_task.description = description + + +async def _akickoff_with_a2a( + self: Agent, + a2a_agents: list[A2AConfig | A2AClientConfig], + original_kickoff_async: Callable[..., Coroutine[Any, Any, LiteAgentOutput]], + messages: str | list[Any], + response_format: type[Any] | None, + input_files: dict[str, Any] | None, + agent_response_model: type[BaseModel] | None, + extension_registry: ExtensionRegistry, +) -> LiteAgentOutput: + """Execute kickoff with A2A delegation support (async). + + Args: + self: The agent instance. + a2a_agents: List of A2A agent configurations. + original_kickoff_async: The original kickoff_async method. + messages: Messages to send to the agent. + response_format: Optional response format. + input_files: Optional input files. + agent_response_model: Optional agent response model. + extension_registry: Registry of A2A extensions. + + Returns: + LiteAgentOutput from kickoff or A2A delegation. + """ + if isinstance(messages, str): + description = messages + else: + content = next( + (m["content"] for m in reversed(messages) if m["role"] == "user"), + None, + ) + description = content if isinstance(content, str) else "" + + if not description: + return await original_kickoff_async( + self, messages, response_format, input_files + ) + + fake_task = Task( + description=description, + agent=self, + expected_output="Result from A2A delegation", + input_files=input_files or {}, + ) + + agent_cards, failed_agents = await _afetch_agent_cards_concurrently(a2a_agents) + + if not agent_cards and a2a_agents and failed_agents: + return await original_kickoff_async( + self, messages, response_format, input_files + ) + + fake_task.description, _, extension_states = _augment_prompt_with_a2a( + a2a_agents=a2a_agents, + task_description=description, + agent_cards=agent_cards, + failed_agents=failed_agents, + extension_registry=extension_registry, + ) + fake_task.response_model = agent_response_model + + try: + result: LiteAgentOutput = await original_kickoff_async( + self, messages, agent_response_model or response_format, input_files + ) + agent_response = _parse_agent_response( + raw_result=result.raw, agent_response_model=agent_response_model + ) + + if extension_registry and isinstance(agent_response, BaseModel): + agent_response = extension_registry.process_response_with_all( + agent_response, extension_states + ) + + if isinstance(agent_response, BaseModel) and isinstance( + agent_response, AgentResponseProtocol + ): + if agent_response.is_a2a: + + async def _kickoff_adapter( + self_: Agent, + _task: Task, + _context: str | None, + _tools: list[Any] | None, + ) -> str: + fmt = ( + _task.response_model or agent_response_model or response_format + ) + output: LiteAgentOutput = await original_kickoff_async( + self_, messages, fmt, input_files + ) + return output.raw + + result_str = await _adelegate_to_a2a( + self, + agent_response=agent_response, + task=fake_task, + original_fn=_kickoff_adapter, + context=None, + tools=None, + agent_cards=agent_cards, + original_task_description=description, + _extension_registry=extension_registry, + ) + return LiteAgentOutput( + raw=result_str, + pydantic=None, + agent_role=self.role, + usage_metrics=None, + messages=[], + ) + return LiteAgentOutput( + raw=agent_response.message, + pydantic=None, + agent_role=self.role, + usage_metrics=result.usage_metrics, + messages=result.messages, + ) + + return result + finally: + fake_task.description = description + + def _augment_prompt_with_a2a( a2a_agents: list[A2AConfig | A2AClientConfig], task_description: str, @@ -764,8 +938,8 @@ def _process_response_result( agent_card=agent_card, ), ) - return str(llm_response.message), None - return None, str(llm_response.message) + return llm_response.message, None + return None, llm_response.message return str(raw_result), None @@ -1342,12 +1516,14 @@ async def _aexecute_task_with_a2a( original_task_description=original_description, _extension_registry=extension_registry, ) - return str(agent_response.message) + task.output_pydantic = None + return agent_response.message return raw_result finally: task.description = original_description - task.output_pydantic = original_output_pydantic + if task.output_pydantic is not None: + task.output_pydantic = original_output_pydantic task.response_model = original_response_model diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 8c414cd67..270c1c8b8 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -94,6 +94,12 @@ from crewai.utilities.token_counter_callback import TokenCalcHandler from crewai.utilities.training_handler import CrewTrainingHandler +try: + from crewai.a2a.types import AgentResponseProtocol +except ImportError: + AgentResponseProtocol = None # type: ignore[assignment, misc] + + if TYPE_CHECKING: from crewai_files import FileInput from crewai_tools import CodeInterpreterTool @@ -490,9 +496,22 @@ class Agent(BaseAgent): self._rpm_controller.stop_rpm_counter() result = process_tool_results(self, result) + + output_for_event = result + if ( + AgentResponseProtocol is not None + and isinstance(result, BaseModel) + and isinstance(result, AgentResponseProtocol) + ): + output_for_event = str(result.message) + elif not isinstance(result, str): + output_for_event = str(result) + crewai_event_bus.emit( self, - event=AgentExecutionCompletedEvent(agent=self, task=task, output=result), + event=AgentExecutionCompletedEvent( + agent=self, task=task, output=output_for_event + ), ) save_last_messages(self) @@ -709,9 +728,22 @@ class Agent(BaseAgent): self._rpm_controller.stop_rpm_counter() result = process_tool_results(self, result) + + output_for_event = result + if ( + AgentResponseProtocol is not None + and isinstance(result, BaseModel) + and isinstance(result, AgentResponseProtocol) + ): + output_for_event = str(result.message) + elif not isinstance(result, str): + output_for_event = str(result) + crewai_event_bus.emit( self, - event=AgentExecutionCompletedEvent(agent=self, task=task, output=result), + event=AgentExecutionCompletedEvent( + agent=self, task=task, output=output_for_event + ), ) save_last_messages(self) @@ -1863,25 +1895,30 @@ class Agent(BaseAgent): # Handle response format conversion formatted_result: BaseModel | None = None if response_format: - try: - model_schema = generate_model_description(response_format) - schema = json.dumps(model_schema, indent=2) - instructions = self.i18n.slice("formatted_task_instructions").format( - output_format=schema - ) + if isinstance(raw_output, BaseModel) and isinstance( + raw_output, response_format + ): + formatted_result = raw_output + elif isinstance(raw_output, str): + try: + model_schema = generate_model_description(response_format) + schema = json.dumps(model_schema, indent=2) + instructions = self.i18n.slice( + "formatted_task_instructions" + ).format(output_format=schema) - converter = Converter( - llm=self.llm, - text=raw_output, - model=response_format, - instructions=instructions, - ) + converter = Converter( + llm=self.llm, + text=raw_output, + model=response_format, + instructions=instructions, + ) - conversion_result = converter.to_pydantic() - if isinstance(conversion_result, BaseModel): - formatted_result = conversion_result - except ConverterError: - pass # Keep raw output if conversion fails + conversion_result = converter.to_pydantic() + if isinstance(conversion_result, BaseModel): + formatted_result = conversion_result + except ConverterError: + pass # Keep raw output if conversion fails # Get token usage metrics if isinstance(self.llm, BaseLLM): @@ -1889,8 +1926,16 @@ class Agent(BaseAgent): else: usage_metrics = self._token_process.get_summary() + raw_str = ( + raw_output + if isinstance(raw_output, str) + else raw_output.model_dump_json() + if isinstance(raw_output, BaseModel) + else str(raw_output) + ) + return LiteAgentOutput( - raw=raw_output, + raw=raw_str, pydantic=formatted_result, agent_role=self.role, usage_metrics=usage_metrics.model_dump() if usage_metrics else None, @@ -1925,25 +1970,30 @@ class Agent(BaseAgent): # Handle response format conversion formatted_result: BaseModel | None = None if response_format: - try: - model_schema = generate_model_description(response_format) - schema = json.dumps(model_schema, indent=2) - instructions = self.i18n.slice("formatted_task_instructions").format( - output_format=schema - ) + if isinstance(raw_output, BaseModel) and isinstance( + raw_output, response_format + ): + formatted_result = raw_output + elif isinstance(raw_output, str): + try: + model_schema = generate_model_description(response_format) + schema = json.dumps(model_schema, indent=2) + instructions = self.i18n.slice( + "formatted_task_instructions" + ).format(output_format=schema) - converter = Converter( - llm=self.llm, - text=raw_output, - model=response_format, - instructions=instructions, - ) + converter = Converter( + llm=self.llm, + text=raw_output, + model=response_format, + instructions=instructions, + ) - conversion_result = converter.to_pydantic() - if isinstance(conversion_result, BaseModel): - formatted_result = conversion_result - except ConverterError: - pass # Keep raw output if conversion fails + conversion_result = converter.to_pydantic() + if isinstance(conversion_result, BaseModel): + formatted_result = conversion_result + except ConverterError: + pass # Keep raw output if conversion fails # Get token usage metrics if isinstance(self.llm, BaseLLM): @@ -1951,8 +2001,16 @@ class Agent(BaseAgent): else: usage_metrics = self._token_process.get_summary() + raw_str = ( + raw_output + if isinstance(raw_output, str) + else raw_output.model_dump_json() + if isinstance(raw_output, BaseModel) + else str(raw_output) + ) + return LiteAgentOutput( - raw=raw_output, + raw=raw_str, pydantic=formatted_result, agent_role=self.role, usage_metrics=usage_metrics.model_dump() if usage_metrics else None, diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index c7a727c30..fc77f9c35 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -365,11 +365,20 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): printer=self._printer, from_task=self.task, from_agent=self.agent, - response_model=None, + response_model=self.response_model, executor_context=self, verbose=self.agent.verbose, ) + # If response is structured output (BaseModel), store it directly + if isinstance(answer, BaseModel): + self.state.current_answer = AgentFinish( + thought="", + output=answer, + text=str(answer), + ) + return "parsed" + # Parse the LLM response formatted_answer = process_llm_response(answer, self.use_stop_words) @@ -436,7 +445,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): available_functions=None, from_task=self.task, from_agent=self.agent, - response_model=None, + response_model=self.response_model, executor_context=self, verbose=self.agent.verbose, ) @@ -448,6 +457,17 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): return "native_tool_calls" + # Structured output (BaseModel) response + if isinstance(answer, BaseModel): + self.state.current_answer = AgentFinish( + thought="", + output=answer, + text=str(answer), + ) + self._invoke_step_callback(self.state.current_answer) + self._append_message_to_state(str(answer)) + return "native_finished" + # Text response - this is the final answer if isinstance(answer, str): self.state.current_answer = AgentFinish( @@ -1300,7 +1320,12 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): Returns: Final answer after feedback. """ - human_feedback = self._ask_human_input(formatted_answer.output) + output_str = ( + str(formatted_answer.output) + if isinstance(formatted_answer.output, BaseModel) + else formatted_answer.output + ) + human_feedback = self._ask_human_input(output_str) if self._is_training_mode(): return self._handle_training_feedback(formatted_answer, human_feedback) @@ -1372,7 +1397,12 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): self.state.ask_for_human_input = False else: answer = self._process_feedback_iteration(feedback) - feedback = self._ask_human_input(answer.output) + output_str = ( + str(answer.output) + if isinstance(answer.output, BaseModel) + else answer.output + ) + feedback = self._ask_human_input(output_str) return answer