enhance step executor with tool usage events and validation

- Added event emissions for tool usage, including started and finished events, to track tool execution.
- Implemented validation to ensure expected tools are called during step execution, raising errors when not.
- Refactored the  method to handle tool execution with event logging.
- Introduced a new method  for parsing tool input into a structured format.
- Updated tests to cover new functionality and ensure correct behavior of tool usage events.
This commit is contained in:
lorenzejay
2026-02-24 14:19:27 -08:00
parent 32059c7d79
commit 3302c5ab77
3 changed files with 233 additions and 5 deletions

View File

@@ -13,12 +13,20 @@ this class single-purpose and fast.
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime
import json
import time
from typing import TYPE_CHECKING, Any
from pydantic import BaseModel
from crewai.agents.parser import AgentAction, AgentFinish
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.agent_utils import (
build_tool_calls_assistant_message,
check_native_tool_support,
@@ -140,6 +148,7 @@ class StepExecutor:
result_text = self._execute_native(messages, tool_calls_made)
else:
result_text = self._execute_text_parsed(messages, tool_calls_made)
self._validate_expected_tool_usage(todo, tool_calls_made)
elapsed = time.monotonic() - start_time
return StepResult(
@@ -265,7 +274,28 @@ class StepExecutor:
if isinstance(formatted, AgentAction):
tool_calls_made.append(formatted.tool)
return self._execute_text_tool_with_events(formatted)
# Raw text response — treat as the step result
return answer_str
def _execute_text_tool_with_events(self, formatted: AgentAction) -> str:
"""Execute text-parsed tool calls with tool usage events."""
args_dict = self._parse_tool_args(formatted.tool_input)
agent_key = getattr(self.agent, "key", "unknown") if self.agent else "unknown"
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=formatted.tool,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
),
)
try:
fingerprint_context = {}
if (
self.agent
@@ -289,11 +319,75 @@ class StepExecutor:
function_calling_llm=self.function_calling_llm,
crew=self.crew,
)
except Exception as e:
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(
tool_name=formatted.tool,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
error=e,
),
)
raise
return str(tool_result.result)
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=str(tool_result.result),
tool_name=formatted.tool,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
started_at=started_at,
finished_at=datetime.now(),
),
)
return str(tool_result.result)
# Raw text response — treat as the step result
return answer_str
def _parse_tool_args(self, tool_input: Any) -> dict[str, Any]:
"""Parse tool args from the parser output into a dict payload for events."""
if isinstance(tool_input, dict):
return tool_input
if isinstance(tool_input, str):
stripped_input = tool_input.strip()
if not stripped_input:
return {}
try:
parsed = json.loads(stripped_input)
if isinstance(parsed, dict):
return parsed
return {"input": parsed}
except json.JSONDecodeError:
return {"input": stripped_input}
return {"input": str(tool_input)}
def _validate_expected_tool_usage(
self,
todo: TodoItem,
tool_calls_made: list[str],
) -> None:
"""Fail step execution when a required tool is configured but not called."""
expected_tool = getattr(todo, "tool_to_use", None)
if not expected_tool:
return
expected_tool_name = sanitize_tool_name(expected_tool)
available_tool_names = {
sanitize_tool_name(tool.name)
for tool in self.tools
if getattr(tool, "name", "")
} | set(self._available_functions.keys())
if expected_tool_name not in available_tool_names:
return
called_names = {sanitize_tool_name(name) for name in tool_calls_made}
if expected_tool_name not in called_names:
raise ValueError(
f"Expected tool '{expected_tool_name}' was not called "
f"for step {todo.step_number}."
)
def _execute_native(
self,

View File

@@ -2752,7 +2752,24 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
if self.step_callback:
cb_result = self.step_callback(formatted_answer)
if inspect.iscoroutine(cb_result):
asyncio.run(cb_result)
if is_inside_event_loop():
callback_task = asyncio.create_task(cb_result)
callback_task.add_done_callback(
self._handle_step_callback_task_result
)
else:
asyncio.run(cb_result)
def _handle_step_callback_task_result(self, task: asyncio.Task[Any]) -> None:
"""Surface async callback errors without crashing the flow event loop."""
try:
task.result()
except Exception as e:
if self.agent.verbose:
self._printer.print(
content=f"Error in async step_callback task: {e!s}",
color="red",
)
def _append_message_to_state(
self, text: str, role: Literal["user", "assistant", "system"] = "assistant"

View File

@@ -4,16 +4,26 @@ Tests the Flow-based agent executor implementation including state management,
flow methods, routing logic, and error handling.
"""
import asyncio
import time
from unittest.mock import Mock, patch
from unittest.mock import AsyncMock, Mock, patch
import pytest
from crewai.agents.step_executor import StepExecutor
from crewai.experimental.agent_executor import (
AgentReActState,
AgentExecutor,
)
from crewai.agents.parser import AgentAction, AgentFinish
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.tools.tool_types import ToolResult
from crewai.utilities.step_execution_context import StepExecutionContext
from crewai.utilities.planning_types import TodoItem
class TestAgentReActState:
"""Test AgentReActState Pydantic model."""
@@ -246,6 +256,113 @@ class TestAgentExecutor:
AgentFinish(thought="thinking", output="test", text="final")
)
@pytest.mark.asyncio
async def test_invoke_step_callback_async_inside_running_loop(
self, mock_dependencies
):
"""Test async step callback scheduling when already in an event loop."""
callback = AsyncMock()
mock_dependencies["step_callback"] = callback
executor = AgentExecutor(**mock_dependencies)
answer = AgentFinish(thought="thinking", output="test", text="final")
with patch("crewai.experimental.agent_executor.asyncio.run") as mock_run:
executor._invoke_step_callback(answer)
await asyncio.sleep(0)
callback.assert_awaited_once_with(answer)
mock_run.assert_not_called()
class TestStepExecutorCriticalFixes:
"""Regression tests for critical plan-and-execute issues."""
@pytest.fixture
def step_executor(self):
llm = Mock()
llm.supports_stop_words.return_value = True
agent = Mock()
agent.role = "Test Agent"
agent.goal = "Execute tasks"
agent.verbose = False
agent.key = "test-agent-key"
tool = Mock()
tool.name = "count_words"
task = Mock()
task.name = "test-task"
task.description = "test task description"
return StepExecutor(
llm=llm,
tools=[tool],
agent=agent,
original_tools=[],
tools_handler=Mock(),
task=task,
crew=Mock(),
function_calling_llm=None,
request_within_rpm_limit=None,
callbacks=[],
)
def test_step_executor_fails_when_expected_tool_is_not_called(self, step_executor):
"""Step should fail if a configured expected tool is not actually invoked."""
todo = TodoItem(
step_number=1,
description="Count words in input text.",
tool_to_use="count_words",
depends_on=[],
status="pending",
)
context = StepExecutionContext(task_description="task", task_goal="goal")
with patch.object(step_executor, "_build_isolated_messages", return_value=[]):
with patch.object(
step_executor, "_execute_text_parsed", return_value="No tool used."
):
result = step_executor.execute(todo, context)
assert result.success is False
assert result.error is not None
assert "Expected tool 'count_words' was not called" in result.error
def test_step_executor_text_tool_emits_usage_events(self, step_executor):
"""Text-parsed tool execution should emit started and finished events."""
started_events: list[ToolUsageStartedEvent] = []
finished_events: list[ToolUsageFinishedEvent] = []
tool_name = "count_words"
action = AgentAction(
thought="Need a tool",
tool=tool_name,
tool_input='{"text":"hello world"}',
text="Action: count_words",
)
@crewai_event_bus.on(ToolUsageStartedEvent)
def _on_started(_source, event):
if event.tool_name == tool_name:
started_events.append(event)
@crewai_event_bus.on(ToolUsageFinishedEvent)
def _on_finished(_source, event):
if event.tool_name == tool_name:
finished_events.append(event)
with patch(
"crewai.agents.step_executor.execute_tool_and_check_finality",
return_value=ToolResult(result="2", result_as_answer=False),
):
output = step_executor._execute_text_tool_with_events(action)
crewai_event_bus.flush()
assert output == "2"
assert len(started_events) >= 1
assert len(finished_events) >= 1
@patch("crewai.experimental.agent_executor.handle_output_parser_exception")
def test_recover_from_parser_error(
self, mock_handle_exception, mock_dependencies