diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index db8dcfc65..8f2443e80 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -1912,17 +1912,19 @@ class Agent(BaseAgent): ) raise - def _execute_and_build_output( + def _build_output_from_result( self, + result: dict[str, Any], executor: AgentExecutor, - inputs: dict[str, str], response_format: type[Any] | None = None, ) -> LiteAgentOutput: - """Execute the agent and build the output object. + """Build a LiteAgentOutput from an executor result dict. + + Shared logic used by both sync and async execution paths. Args: + result: The result dictionary from executor.invoke / invoke_async. executor: The executor instance. - inputs: Input dictionary for execution. response_format: Optional response format. Returns: @@ -1930,8 +1932,6 @@ class Agent(BaseAgent): """ import json - # Execute the agent (this is called from sync path, so invoke returns dict) - result = cast(dict[str, Any], executor.invoke(inputs)) output = result.get("output", "") # Handle response format conversion @@ -1979,7 +1979,6 @@ class Agent(BaseAgent): else str(raw_output) ) - # Extract todo execution results from executor state todo_results = LiteAgentOutput.from_todo_items(executor.state.todos.items) return LiteAgentOutput( @@ -1994,90 +1993,25 @@ class Agent(BaseAgent): last_replan_reason=executor.state.last_replan_reason, ) + def _execute_and_build_output( + self, + executor: AgentExecutor, + inputs: dict[str, str], + response_format: type[Any] | None = None, + ) -> LiteAgentOutput: + """Execute the agent synchronously and build the output object.""" + result = cast(dict[str, Any], executor.invoke(inputs)) + return self._build_output_from_result(result, executor, response_format) + async def _execute_and_build_output_async( self, executor: AgentExecutor, inputs: dict[str, str], response_format: type[Any] | None = None, ) -> LiteAgentOutput: - """Execute the agent asynchronously and build the output object. - - This is the async version of _execute_and_build_output that uses - invoke_async() for native async execution within event loops. - - Args: - executor: The executor instance. - inputs: Input dictionary for execution. - response_format: Optional response format. - - Returns: - LiteAgentOutput with raw output, formatted result, and metrics. - """ - import json - - # Execute the agent asynchronously + """Execute the agent asynchronously and build the output object.""" result = await executor.invoke_async(inputs) - output = result.get("output", "") - - # Handle response format conversion - formatted_result: BaseModel | None = None - raw_output: str - - if isinstance(output, BaseModel): - formatted_result = output - raw_output = output.model_dump_json() - elif response_format: - raw_output = str(output) if not isinstance(output, str) else output - try: - model_schema = generate_model_description(response_format) - schema = json.dumps(model_schema, indent=2) - instructions = self.i18n.slice("formatted_task_instructions").format( - output_format=schema - ) - - converter = Converter( - llm=self.llm, - text=raw_output, - model=response_format, - instructions=instructions, - ) - - conversion_result = converter.to_pydantic() - if isinstance(conversion_result, BaseModel): - formatted_result = conversion_result - except ConverterError: - pass # Keep raw output if conversion fails - else: - raw_output = str(output) if not isinstance(output, str) else output - - # Get token usage metrics - if isinstance(self.llm, BaseLLM): - usage_metrics = self.llm.get_token_usage_summary() - else: - usage_metrics = self._token_process.get_summary() - - raw_str = ( - raw_output - if isinstance(raw_output, str) - else raw_output.model_dump_json() - if isinstance(raw_output, BaseModel) - else str(raw_output) - ) - - # Extract todo execution results from executor state - todo_results = LiteAgentOutput.from_todo_items(executor.state.todos.items) - - return LiteAgentOutput( - raw=raw_str, - pydantic=formatted_result, - agent_role=self.role, - usage_metrics=usage_metrics.model_dump() if usage_metrics else None, - messages=list(executor.state.messages), - plan=executor.state.plan, - todos=todo_results, - replan_count=executor.state.replan_count, - last_replan_reason=executor.state.last_replan_reason, - ) + return self._build_output_from_result(result, executor, response_format) def _process_kickoff_guardrail( self,