Compare commits

...

5 Commits

Author SHA1 Message Date
Devin AI
703e5079a0 fix: remove unused type: ignore[misc] on ask_for_human_input property
Co-Authored-By: João <joao@crewai.com>
2026-06-07 23:51:55 +00:00
Devin AI
37b4db3f15 fix: add type ignore for mypy no-any-return on ask_for_human_input property
Co-Authored-By: João <joao@crewai.com>
2026-06-07 23:48:01 +00:00
Devin AI
6161190c5c fix: remove xfail from test_crew_train_success now that _format_feedback_message is implemented
Co-Authored-By: João <joao@crewai.com>
2026-06-07 23:44:20 +00:00
Devin AI
847aee8906 fix: apply ruff format to agent_executor.py
Co-Authored-By: João <joao@crewai.com>
2026-06-07 23:41:53 +00:00
Devin AI
34afc71f80 Fix #6065: Add ExecutorContext protocol compliance to experimental AgentExecutor
The new default AgentExecutor (Flow-based) did not expose ask_for_human_input
as a direct attribute — it only stored it in self.state.ask_for_human_input.
This caused AttributeError when human_input=True was set on a Task, because
SyncHumanInputProvider reads/writes context.ask_for_human_input directly.

Changes:
- Add ask_for_human_input property (getter+setter) delegating to state
- Add _invoke_loop() and _ainvoke_loop() for re-running agent after feedback
- Add _format_feedback_message() for formatting human feedback as LLM messages
- Add 12 regression tests covering ExecutorContext protocol compliance

Co-Authored-By: João <joao@crewai.com>
2026-06-07 23:39:48 +00:00
3 changed files with 225 additions and 6 deletions

View File

@@ -279,6 +279,16 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
"""Set state messages."""
self._state.messages = value
@property
def ask_for_human_input(self) -> bool:
"""Compatibility property - delegates to state for ExecutorContext protocol."""
return self._state.ask_for_human_input # type: ignore[no-any-return]
@ask_for_human_input.setter
def ask_for_human_input(self, value: bool) -> None:
"""Set state ask_for_human_input."""
self._state.ask_for_human_input = value
@start()
def generate_plan(self) -> None:
"""Generate execution plan if planning is enabled.
@@ -3071,6 +3081,60 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
formatted_answer, cast("AsyncExecutorContext", self)
)
def _invoke_loop(self) -> AgentFinish:
"""Re-run the agent execution loop (used by human feedback providers).
Resets iteration bookkeeping and re-runs the Flow so the agent can
incorporate human feedback into a new answer.
Returns:
Final answer from the agent.
"""
self.state.iterations = 0
self.state.current_answer = None
self.state.is_finished = False
self._finalize_called = False
self.kickoff()
answer = self.state.current_answer
if not isinstance(answer, AgentFinish):
raise RuntimeError("Agent execution ended without reaching a final answer.")
return answer
async def _ainvoke_loop(self) -> AgentFinish:
"""Re-run the agent execution loop asynchronously.
Async counterpart of ``_invoke_loop`` for async human feedback flows.
Returns:
Final answer from the agent.
"""
self.state.iterations = 0
self.state.current_answer = None
self.state.is_finished = False
self._finalize_called = False
await self.kickoff_async()
answer = self.state.current_answer
if not isinstance(answer, AgentFinish):
raise RuntimeError("Agent execution ended without reaching a final answer.")
return answer
def _format_feedback_message(self, feedback: str) -> LLMMessage:
"""Format human feedback as a message for the LLM.
Args:
feedback: User feedback string.
Returns:
Formatted message dict.
"""
return format_message_for_llm(
I18N_DEFAULT.slice("feedback_instructions").format(feedback=feedback)
)
def _is_training_mode(self) -> bool:
"""Check if training mode is active.

View File

@@ -2224,3 +2224,164 @@ class TestVisionImageFormatContract:
assert hasattr(AnthropicCompletion, "_convert_image_blocks"), (
"Anthropic provider must have _convert_image_blocks for auto-conversion"
)
class TestHumanInputProtocolCompliance:
"""AgentExecutor must satisfy the ExecutorContext protocol so that
human_input=True on a Task works with the experimental executor.
Regression tests for https://github.com/crewAIInc/crewAI/issues/6065
"""
def test_ask_for_human_input_property_reads_state(self):
"""ask_for_human_input property delegates to state."""
executor = _build_executor()
assert executor.ask_for_human_input is False
executor._state.ask_for_human_input = True
assert executor.ask_for_human_input is True
def test_ask_for_human_input_property_writes_state(self):
"""Setting ask_for_human_input propagates to state."""
executor = _build_executor()
executor.ask_for_human_input = True
assert executor._state.ask_for_human_input is True
executor.ask_for_human_input = False
assert executor._state.ask_for_human_input is False
def test_executor_has_format_feedback_message(self):
"""_format_feedback_message must exist and return an LLM message."""
executor = _build_executor()
msg = executor._format_feedback_message("please improve")
assert isinstance(msg, dict)
assert "role" in msg
assert "please improve" in msg.get("content", "")
def test_executor_has_invoke_loop(self):
"""_invoke_loop must exist as a callable method."""
executor = _build_executor()
assert callable(getattr(executor, "_invoke_loop", None))
def test_executor_has_ainvoke_loop(self):
"""_ainvoke_loop must exist as a callable method."""
executor = _build_executor()
assert callable(getattr(executor, "_ainvoke_loop", None))
def test_executor_has_is_training_mode(self):
"""_is_training_mode must exist and return False when no crew."""
executor = _build_executor(crew=None)
assert executor._is_training_mode() is False
def test_executor_has_handle_crew_training_output(self):
"""_handle_crew_training_output must exist as a callable method."""
executor = _build_executor()
assert callable(getattr(executor, "_handle_crew_training_output", None))
def test_executor_context_protocol_attributes(self):
"""AgentExecutor must expose all attributes required by ExecutorContext."""
from crewai.core.providers.human_input import ExecutorContext
required_attrs = ["task", "crew", "messages", "ask_for_human_input", "llm", "agent"]
required_methods = ["_invoke_loop", "_is_training_mode", "_handle_crew_training_output", "_format_feedback_message"]
executor = _build_executor(
task=Mock(), crew=Mock(), agent=Mock(), llm=Mock()
)
for attr in required_attrs:
assert hasattr(executor, attr), f"Missing ExecutorContext attribute: {attr}"
for method in required_methods:
assert callable(getattr(executor, method, None)), f"Missing ExecutorContext method: {method}"
def test_invoke_loop_resets_state_and_runs_flow(self):
"""_invoke_loop must reset iteration state and re-run the flow."""
executor = _build_executor()
executor.state.iterations = 10
executor.state.is_finished = True
executor._finalize_called = True
expected_answer = AgentFinish(
thought="done", output="result", text="result"
)
def fake_kickoff():
executor.state.current_answer = expected_answer
with patch.object(executor, "kickoff", side_effect=fake_kickoff):
result = executor._invoke_loop()
assert result is expected_answer
assert executor.state.iterations == 0
def test_ainvoke_loop_resets_state_and_runs_flow(self):
"""_ainvoke_loop must reset iteration state and re-run the async flow."""
executor = _build_executor()
executor.state.iterations = 5
executor.state.is_finished = True
executor._finalize_called = True
expected_answer = AgentFinish(
thought="done", output="async result", text="async result"
)
async def fake_kickoff_async():
executor.state.current_answer = expected_answer
async def run_test():
with patch.object(executor, "kickoff_async", side_effect=fake_kickoff_async):
return await executor._ainvoke_loop()
result = asyncio.run(run_test())
assert result is expected_answer
assert executor.state.iterations == 0
def test_handle_human_feedback_uses_provider(self):
"""_handle_human_feedback must delegate to the active HumanInputProvider."""
executor = _build_executor()
answer = AgentFinish(thought="t", output="o", text="t")
mock_provider = Mock()
mock_provider.handle_feedback.return_value = answer
with patch(
"crewai.experimental.agent_executor.get_provider",
return_value=mock_provider,
):
result = executor._handle_human_feedback(answer)
mock_provider.handle_feedback.assert_called_once()
assert result is answer
def test_human_input_flag_set_via_invoke_inputs(self):
"""invoke() must set ask_for_human_input from inputs dict."""
executor = _build_executor(
llm=Mock(supports_stop_words=Mock(return_value=False)),
agent=Mock(verbose=False, planning_enabled=False),
prompt={"prompt": "{input}"},
)
expected_answer = AgentFinish(
thought="done", output="result", text="result"
)
def fake_kickoff():
executor.state.current_answer = expected_answer
with (
patch.object(executor, "kickoff", side_effect=fake_kickoff),
patch.object(executor, "_save_to_memory"),
patch.object(executor, "_handle_human_feedback", return_value=expected_answer) as mock_hf,
patch.object(executor, "_inject_files_from_inputs"),
patch.object(executor, "_format_prompt", side_effect=lambda p, i: p),
):
executor.invoke({
"input": "test",
"tool_names": "",
"tools": "",
"ask_for_human_input": True,
})
mock_hf.assert_called_once_with(expected_answer)

View File

@@ -2908,12 +2908,6 @@ def test_manager_agent_with_tools_raises_exception(researcher, writer):
crew.kickoff()
@pytest.mark.xfail(
strict=True,
reason="crew.train() relies on CrewAgentExecutor._format_feedback_message; "
"AgentExecutor (the new default) does not implement training feedback yet. "
"Remove this xfail once training is migrated to AgentExecutor.",
)
@pytest.mark.vcr()
def test_crew_train_success(researcher, writer, monkeypatch):
task = Task(