diff --git a/lib/crewai/src/crewai/agents/step_executor.py b/lib/crewai/src/crewai/agents/step_executor.py index 5b05acc49..043b1000f 100644 --- a/lib/crewai/src/crewai/agents/step_executor.py +++ b/lib/crewai/src/crewai/agents/step_executor.py @@ -13,12 +13,20 @@ this class single-purpose and fast. from __future__ import annotations from collections.abc import Callable +from datetime import datetime +import json import time from typing import TYPE_CHECKING, Any from pydantic import BaseModel from crewai.agents.parser import AgentAction, AgentFinish +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.tool_usage_events import ( + ToolUsageErrorEvent, + ToolUsageFinishedEvent, + ToolUsageStartedEvent, +) from crewai.utilities.agent_utils import ( build_tool_calls_assistant_message, check_native_tool_support, @@ -140,6 +148,7 @@ class StepExecutor: result_text = self._execute_native(messages, tool_calls_made) else: result_text = self._execute_text_parsed(messages, tool_calls_made) + self._validate_expected_tool_usage(todo, tool_calls_made) elapsed = time.monotonic() - start_time return StepResult( @@ -265,7 +274,28 @@ class StepExecutor: if isinstance(formatted, AgentAction): tool_calls_made.append(formatted.tool) + return self._execute_text_tool_with_events(formatted) + # Raw text response — treat as the step result + return answer_str + + def _execute_text_tool_with_events(self, formatted: AgentAction) -> str: + """Execute text-parsed tool calls with tool usage events.""" + args_dict = self._parse_tool_args(formatted.tool_input) + agent_key = getattr(self.agent, "key", "unknown") if self.agent else "unknown" + started_at = datetime.now() + crewai_event_bus.emit( + self, + event=ToolUsageStartedEvent( + tool_name=formatted.tool, + tool_args=args_dict, + from_agent=self.agent, + from_task=self.task, + agent_key=agent_key, + ), + ) + + try: fingerprint_context = {} if ( self.agent @@ -289,11 +319,75 @@ class StepExecutor: function_calling_llm=self.function_calling_llm, crew=self.crew, ) + except Exception as e: + crewai_event_bus.emit( + self, + event=ToolUsageErrorEvent( + tool_name=formatted.tool, + tool_args=args_dict, + from_agent=self.agent, + from_task=self.task, + agent_key=agent_key, + error=e, + ), + ) + raise - return str(tool_result.result) + crewai_event_bus.emit( + self, + event=ToolUsageFinishedEvent( + output=str(tool_result.result), + tool_name=formatted.tool, + tool_args=args_dict, + from_agent=self.agent, + from_task=self.task, + agent_key=agent_key, + started_at=started_at, + finished_at=datetime.now(), + ), + ) + return str(tool_result.result) - # Raw text response — treat as the step result - return answer_str + def _parse_tool_args(self, tool_input: Any) -> dict[str, Any]: + """Parse tool args from the parser output into a dict payload for events.""" + if isinstance(tool_input, dict): + return tool_input + if isinstance(tool_input, str): + stripped_input = tool_input.strip() + if not stripped_input: + return {} + try: + parsed = json.loads(stripped_input) + if isinstance(parsed, dict): + return parsed + return {"input": parsed} + except json.JSONDecodeError: + return {"input": stripped_input} + return {"input": str(tool_input)} + + def _validate_expected_tool_usage( + self, + todo: TodoItem, + tool_calls_made: list[str], + ) -> None: + """Fail step execution when a required tool is configured but not called.""" + expected_tool = getattr(todo, "tool_to_use", None) + if not expected_tool: + return + expected_tool_name = sanitize_tool_name(expected_tool) + available_tool_names = { + sanitize_tool_name(tool.name) + for tool in self.tools + if getattr(tool, "name", "") + } | set(self._available_functions.keys()) + if expected_tool_name not in available_tool_names: + return + called_names = {sanitize_tool_name(name) for name in tool_calls_made} + if expected_tool_name not in called_names: + raise ValueError( + f"Expected tool '{expected_tool_name}' was not called " + f"for step {todo.step_number}." + ) def _execute_native( self, diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index 6351bedbd..bbb30d370 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -2752,7 +2752,24 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): if self.step_callback: cb_result = self.step_callback(formatted_answer) if inspect.iscoroutine(cb_result): - asyncio.run(cb_result) + if is_inside_event_loop(): + callback_task = asyncio.create_task(cb_result) + callback_task.add_done_callback( + self._handle_step_callback_task_result + ) + else: + asyncio.run(cb_result) + + def _handle_step_callback_task_result(self, task: asyncio.Task[Any]) -> None: + """Surface async callback errors without crashing the flow event loop.""" + try: + task.result() + except Exception as e: + if self.agent.verbose: + self._printer.print( + content=f"Error in async step_callback task: {e!s}", + color="red", + ) def _append_message_to_state( self, text: str, role: Literal["user", "assistant", "system"] = "assistant" diff --git a/lib/crewai/tests/agents/test_agent_executor.py b/lib/crewai/tests/agents/test_agent_executor.py index 3fa70e1e1..7dc4bfc7f 100644 --- a/lib/crewai/tests/agents/test_agent_executor.py +++ b/lib/crewai/tests/agents/test_agent_executor.py @@ -4,16 +4,26 @@ Tests the Flow-based agent executor implementation including state management, flow methods, routing logic, and error handling. """ +import asyncio import time -from unittest.mock import Mock, patch +from unittest.mock import AsyncMock, Mock, patch import pytest +from crewai.agents.step_executor import StepExecutor from crewai.experimental.agent_executor import ( AgentReActState, AgentExecutor, ) from crewai.agents.parser import AgentAction, AgentFinish +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.tool_usage_events import ( + ToolUsageFinishedEvent, + ToolUsageStartedEvent, +) +from crewai.tools.tool_types import ToolResult +from crewai.utilities.step_execution_context import StepExecutionContext +from crewai.utilities.planning_types import TodoItem class TestAgentReActState: """Test AgentReActState Pydantic model.""" @@ -246,6 +256,113 @@ class TestAgentExecutor: AgentFinish(thought="thinking", output="test", text="final") ) + @pytest.mark.asyncio + async def test_invoke_step_callback_async_inside_running_loop( + self, mock_dependencies + ): + """Test async step callback scheduling when already in an event loop.""" + callback = AsyncMock() + mock_dependencies["step_callback"] = callback + executor = AgentExecutor(**mock_dependencies) + + answer = AgentFinish(thought="thinking", output="test", text="final") + with patch("crewai.experimental.agent_executor.asyncio.run") as mock_run: + executor._invoke_step_callback(answer) + await asyncio.sleep(0) + + callback.assert_awaited_once_with(answer) + mock_run.assert_not_called() + + +class TestStepExecutorCriticalFixes: + """Regression tests for critical plan-and-execute issues.""" + + @pytest.fixture + def step_executor(self): + llm = Mock() + llm.supports_stop_words.return_value = True + + agent = Mock() + agent.role = "Test Agent" + agent.goal = "Execute tasks" + agent.verbose = False + agent.key = "test-agent-key" + + tool = Mock() + tool.name = "count_words" + task = Mock() + task.name = "test-task" + task.description = "test task description" + + return StepExecutor( + llm=llm, + tools=[tool], + agent=agent, + original_tools=[], + tools_handler=Mock(), + task=task, + crew=Mock(), + function_calling_llm=None, + request_within_rpm_limit=None, + callbacks=[], + ) + + def test_step_executor_fails_when_expected_tool_is_not_called(self, step_executor): + """Step should fail if a configured expected tool is not actually invoked.""" + todo = TodoItem( + step_number=1, + description="Count words in input text.", + tool_to_use="count_words", + depends_on=[], + status="pending", + ) + context = StepExecutionContext(task_description="task", task_goal="goal") + + with patch.object(step_executor, "_build_isolated_messages", return_value=[]): + with patch.object( + step_executor, "_execute_text_parsed", return_value="No tool used." + ): + result = step_executor.execute(todo, context) + + assert result.success is False + assert result.error is not None + assert "Expected tool 'count_words' was not called" in result.error + + def test_step_executor_text_tool_emits_usage_events(self, step_executor): + """Text-parsed tool execution should emit started and finished events.""" + started_events: list[ToolUsageStartedEvent] = [] + finished_events: list[ToolUsageFinishedEvent] = [] + + tool_name = "count_words" + action = AgentAction( + thought="Need a tool", + tool=tool_name, + tool_input='{"text":"hello world"}', + text="Action: count_words", + ) + + @crewai_event_bus.on(ToolUsageStartedEvent) + def _on_started(_source, event): + if event.tool_name == tool_name: + started_events.append(event) + + @crewai_event_bus.on(ToolUsageFinishedEvent) + def _on_finished(_source, event): + if event.tool_name == tool_name: + finished_events.append(event) + + with patch( + "crewai.agents.step_executor.execute_tool_and_check_finality", + return_value=ToolResult(result="2", result_as_answer=False), + ): + output = step_executor._execute_text_tool_with_events(action) + + crewai_event_bus.flush() + + assert output == "2" + assert len(started_events) >= 1 + assert len(finished_events) >= 1 + @patch("crewai.experimental.agent_executor.handle_output_parser_exception") def test_recover_from_parser_error( self, mock_handle_exception, mock_dependencies