more improvements

This commit is contained in:
lorenzejay
2026-03-11 16:00:19 -07:00
parent ae1a925e20
commit f3a54a7b87
4 changed files with 262 additions and 66 deletions

View File

@@ -74,7 +74,7 @@ class PlanningConfig(BaseModel):
"""
reasoning_effort: Literal["low", "medium", "high"] = Field(
default="low",
default="medium",
description=(
"Controls post-step observation and replanning behavior. "
"'low' observes steps but skips replanning/refinement (fastest). "

View File

@@ -374,10 +374,9 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin):
if self.state.plan_ready and output.plan.steps:
self._create_todos_from_plan(output.plan.steps)
# Backward compatibility: append plan to task description
# This can be removed in Phase 2 when plan execution is implemented
if self.task and self.state.plan:
self.task.description += f"\n\nPlanning:\n{self.state.plan}"
# Plan is stored in state.plan and used by the execution flow.
# Do NOT mutate task.description — it's a shared object that
# accumulates plan text on re-invoke.
except Exception as e:
if hasattr(self.agent, "_logger"):
@@ -667,17 +666,18 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin):
)
return "replan_now"
# Step failed but observer does not require a full replan — continue
self.state.todos.mark_completed(
# Step failed but observer does not require a full replan — mark as
# failed (not completed) so get_failed_todos() tracks it correctly.
self.state.todos.mark_failed(
current_todo.step_number, result=current_todo.result
)
if self.agent.verbose:
completed = self.state.todos.completed_count
failed = len(self.state.todos.get_failed_todos())
total = len(self.state.todos.items)
self._printer.print(
content=(
f"[Medium] Step {current_todo.step_number} failed but no replan needed "
f"({completed}/{total}) — continuing"
f"({failed} failed/{total} total) — continuing"
),
color="yellow",
)

View File

@@ -99,7 +99,7 @@ class TodoList(BaseModel):
item = self.get_by_step_number(step_number)
if item:
item.status = "completed"
if result:
if result is not None:
item.result = result
def mark_failed(self, step_number: int, result: str | None = None) -> None:
@@ -107,21 +107,27 @@ class TodoList(BaseModel):
item = self.get_by_step_number(step_number)
if item:
item.status = "failed"
if result:
if result is not None:
item.result = result
def _dependencies_satisfied(self, item: TodoItem) -> bool:
"""Check if all dependencies for a todo item are completed.
"""Check if all dependencies for a todo item are in a terminal state.
A dependency is satisfied when it has finished executing — either
successfully (completed) or not (failed). This prevents downstream
todos from being permanently blocked when a dependency fails.
The executor/observer is responsible for deciding whether to skip,
replan, or continue when a dependency has failed.
Args:
item: The todo item to check dependencies for.
Returns:
True if all dependencies are completed, False otherwise.
True if all dependencies are in a terminal state, False otherwise.
"""
for dep_num in item.depends_on:
dep = self.get_by_step_number(dep_num)
if dep is None or dep.status != "completed":
if dep is None or dep.status not in ("completed", "failed"):
return False
return True

View File

@@ -1531,12 +1531,13 @@ class TestReasoningEffort:
assert todo.status == "completed"
assert todo.result == "Done successfully"
def test_planning_config_reasoning_effort_default_is_low(self):
"""Verify PlanningConfig defaults reasoning_effort to 'low'."""
def test_planning_config_reasoning_effort_default_is_medium(self):
"""Verify PlanningConfig defaults reasoning_effort to 'medium'
(aligned with runtime default in _get_reasoning_effort)."""
from crewai.agent.planning_config import PlanningConfig
config = PlanningConfig()
assert config.reasoning_effort == "low"
assert config.reasoning_effort == "medium"
def test_planning_config_reasoning_effort_validation(self):
"""Verify PlanningConfig rejects invalid reasoning_effort values."""
@@ -1575,15 +1576,10 @@ class TestReasoningEffort:
assert executor._get_reasoning_effort() == "medium"
# =========================================================================
# P0 Bug Fix Tests
# =========================================================================
class TestP0ObserverParseResponse:
"""P0 #1: PlannerObserver._parse_observation_response must handle
JSON strings, dicts, and StepObservation instances — not silently
fall back to a hardcoded success default."""
class TestObserverResponseParsing:
"""PlannerObserver must correctly parse LLM responses regardless of
the format returned (StepObservation, JSON string, dict)."""
def test_parse_step_observation_instance(self):
"""Direct StepObservation instance passes through unchanged."""
@@ -1603,7 +1599,7 @@ class TestP0ObserverParseResponse:
assert result.needs_full_replan is True
def test_parse_json_string(self):
"""JSON string (the path that was broken before the fix) is parsed correctly."""
"""JSON string from non-streaming LLM path is parsed correctly."""
import json
from crewai.agents.planner_observer import PlannerObserver
@@ -1643,7 +1639,7 @@ class TestP0ObserverParseResponse:
assert result.key_information_learned == "found 3 files"
def test_parse_dict_response(self):
"""Dict response (some provider paths) is parsed correctly."""
"""Dict response from some provider paths is parsed correctly."""
from crewai.agents.planner_observer import PlannerObserver
from crewai.utilities.planning_types import StepObservation
@@ -1662,14 +1658,13 @@ class TestP0ObserverParseResponse:
assert result.replan_reason == "step timed out"
def test_parse_unparseable_falls_back_gracefully(self):
"""Totally unparseable response falls back to default but logs a warning."""
"""Totally unparseable response falls back to default."""
from crewai.agents.planner_observer import PlannerObserver
from crewai.utilities.planning_types import StepObservation
result = PlannerObserver._parse_observation_response(12345)
assert isinstance(result, StepObservation)
# Fallback defaults to success (conservative — don't wipe the plan)
assert result.step_completed_successfully is True
assert result.remaining_plan_still_valid is True
@@ -1680,7 +1675,6 @@ class TestP0ObserverParseResponse:
from crewai.agents.planner_observer import PlannerObserver
from crewai.utilities.planning_types import StepObservation, TodoItem
# Simulate llm.call() returning a JSON string (the broken path)
llm = Mock()
llm.call.return_value = json.dumps({
"step_completed_successfully": False,
@@ -1709,31 +1703,31 @@ class TestP0ObserverParseResponse:
remaining_todos=[],
)
# The critical assertion: the observer actually parsed the LLM's judgment
# instead of falling back to the hardcoded success default
assert observation.step_completed_successfully is False
assert observation.needs_full_replan is True
assert observation.replan_reason == "build system is misconfigured"
class TestP0MaxIterationsRouting:
"""P0 #2: check_max_iterations must route to force_final_answer,
not to a dead-end 'max_iterations_exceeded' event."""
# =========================================================================
# Max Iterations Routing
# =========================================================================
def test_check_max_iterations_returns_force_final_answer(self):
"""When max iterations are exceeded, route to 'force_final_answer'."""
class TestMaxIterationsRouting:
"""check_max_iterations must route to force_final_answer when
the iteration limit is exceeded, not to a dead-end event."""
def test_exceeded_routes_to_force_final_answer(self):
from crewai.experimental.agent_executor import AgentExecutor
executor = Mock(spec=AgentExecutor)
executor.state = AgentExecutorState(iterations=25)
executor.max_iter = 20
# Call the unbound method with our mock as self
result = AgentExecutor.check_max_iterations(executor)
assert result == "force_final_answer"
def test_check_max_iterations_continues_when_under_limit(self):
"""When under the limit, route to continue_reasoning."""
def test_under_limit_continues_reasoning(self):
from crewai.experimental.agent_executor import AgentExecutor
executor = Mock(spec=AgentExecutor)
@@ -1743,8 +1737,7 @@ class TestP0MaxIterationsRouting:
result = AgentExecutor.check_max_iterations(executor)
assert result == "continue_reasoning"
def test_check_max_iterations_native_tools_path(self):
"""When under limit with native tools, route to continue_reasoning_native."""
def test_under_limit_with_native_tools(self):
from crewai.experimental.agent_executor import AgentExecutor
executor = Mock(spec=AgentExecutor)
@@ -1755,45 +1748,38 @@ class TestP0MaxIterationsRouting:
assert result == "continue_reasoning_native"
class TestP0UnboundResultNativeToolCall:
"""P0 #3: _execute_single_native_tool_call must not leave `result`
unbound when max_usage_reached=True but original_tool is None."""
# =========================================================================
# Native Tool Call Edge Cases
# =========================================================================
class TestNativeToolCallMaxUsage:
"""_execute_single_native_tool_call must produce a result string
even when max_usage_reached=True and original_tool is None."""
def test_max_usage_reached_without_original_tool(self):
"""When max_usage_reached and original_tool is None, still returns a result string."""
from crewai.experimental.agent_executor import AgentExecutor
# Build a minimal executor mock
executor = Mock(spec=AgentExecutor)
executor.agent = Mock()
executor.agent.verbose = False
executor.task = None
executor.crew = None
executor._available_functions = {}
executor.tools_handler = None
# Call the actual method — it should not raise UnboundLocalError
# We need to construct the right arguments
# The method signature requires specific args, let's just verify
# the branch logic by checking the source was patched
import inspect
source = inspect.getsource(AgentExecutor._execute_single_native_tool_call)
# The fix: max_usage_reached branch no longer requires original_tool
assert "elif max_usage_reached:" in source
assert 'result = f"Tool \'{func_name}\' has reached its maximum usage limit' in source
class TestFinalizeCalledReset:
"""_finalize_called must be reset on re-invoke to prevent
second invocations from returning immediately with no output."""
# =========================================================================
# Executor State Reset on Re-invoke
# =========================================================================
class TestExecutorStateReset:
"""invoke() and invoke_async() must reset all execution state
(including _finalize_called) so re-invocations work correctly."""
def test_finalize_called_reset_in_invoke(self):
"""invoke() resets _finalize_called before execution."""
import inspect
from crewai.experimental.agent_executor import AgentExecutor
source = inspect.getsource(AgentExecutor.invoke)
# _finalize_called = False should appear before messages.clear()
finalize_idx = source.index("self._finalize_called = False")
messages_idx = source.index("self.state.messages.clear()")
assert finalize_idx < messages_idx, (
@@ -1801,7 +1787,6 @@ class TestFinalizeCalledReset:
)
def test_finalize_called_reset_in_invoke_async(self):
"""invoke_async() resets _finalize_called before execution."""
import inspect
from crewai.experimental.agent_executor import AgentExecutor
@@ -1811,3 +1796,208 @@ class TestFinalizeCalledReset:
assert finalize_idx < messages_idx, (
"_finalize_called must be reset before state reset in async path"
)
# =========================================================================
# Plan Generation Isolation
# =========================================================================
class TestPlanGenerationIsolation:
"""generate_plan must store the plan in state only — never mutate
the shared task.description object."""
def test_generate_plan_does_not_mutate_task_description(self):
import inspect
from crewai.experimental.agent_executor import AgentExecutor
source = inspect.getsource(AgentExecutor.generate_plan)
assert "task.description +=" not in source, (
"generate_plan still mutates task.description"
)
assert "task.description =" not in source or "Plan is stored in state" in source, (
"generate_plan should store plan in state, not task.description"
)
# =========================================================================
# Todo Status Tracking
# =========================================================================
class TestTodoStatusTracking:
"""Steps that fail without triggering a replan must be marked 'failed'
(not 'completed') so status queries remain accurate."""
def test_medium_effort_marks_failed_step_as_failed(self):
import inspect
from crewai.experimental.agent_executor import AgentExecutor
source = inspect.getsource(AgentExecutor.handle_step_observed_medium)
assert "mark_failed" in source, (
"handle_step_observed_medium should use mark_failed for failed steps"
)
failed_no_replan_idx = source.index("failed but no replan")
after_comment = source[failed_no_replan_idx:]
assert "mark_completed" not in after_comment, (
"mark_completed should not be called on failed steps"
)
def test_failed_step_appears_in_get_failed_todos(self):
from crewai.utilities.planning_types import TodoItem, TodoList
todos = TodoList(items=[
TodoItem(step_number=1, description="Step 1"),
TodoItem(step_number=2, description="Step 2"),
])
todos.mark_running(1)
todos.mark_failed(1, result="Error: build failed")
failed = todos.get_failed_todos()
assert len(failed) == 1
assert failed[0].step_number == 1
assert failed[0].result == "Error: build failed"
completed = todos.get_completed_todos()
assert len(completed) == 0
# =========================================================================
# TodoList Result Handling
# =========================================================================
class TestTodoResultHandling:
"""mark_completed/mark_failed must use `is not None` checks so
empty-string results are preserved."""
def test_mark_completed_preserves_empty_string(self):
from crewai.utilities.planning_types import TodoItem, TodoList
todos = TodoList(items=[
TodoItem(step_number=1, description="Step 1"),
])
todos.mark_completed(1, result="")
item = todos.get_by_step_number(1)
assert item.status == "completed"
assert item.result == "", "Empty-string result should be stored, not dropped"
def test_mark_failed_preserves_empty_string(self):
from crewai.utilities.planning_types import TodoItem, TodoList
todos = TodoList(items=[
TodoItem(step_number=1, description="Step 1"),
])
todos.mark_failed(1, result="")
item = todos.get_by_step_number(1)
assert item.status == "failed"
assert item.result == "", "Empty-string result should be stored, not dropped"
def test_mark_completed_none_does_not_overwrite(self):
from crewai.utilities.planning_types import TodoItem, TodoList
todos = TodoList(items=[
TodoItem(step_number=1, description="Step 1", result="existing"),
])
todos.mark_completed(1, result=None)
item = todos.get_by_step_number(1)
assert item.result == "existing", "None result should not overwrite existing"
# =========================================================================
# Dependency Resolution with Failed Steps
# =========================================================================
class TestDependencyResolutionWithFailures:
"""Failed dependencies must be treated as terminal so downstream
todos are not permanently blocked."""
def test_failed_dep_unblocks_downstream(self):
from crewai.utilities.planning_types import TodoItem, TodoList
todos = TodoList(items=[
TodoItem(step_number=1, description="Build"),
TodoItem(step_number=2, description="Test", depends_on=[1]),
TodoItem(step_number=3, description="Deploy", depends_on=[2]),
])
todos.mark_running(1)
todos.mark_failed(1, result="build error")
ready = todos.get_ready_todos()
assert len(ready) == 1
assert ready[0].step_number == 2
def test_is_complete_with_mixed_terminal_states(self):
from crewai.utilities.planning_types import TodoItem, TodoList
todos = TodoList(items=[
TodoItem(step_number=1, description="A", status="completed"),
TodoItem(step_number=2, description="B", status="failed"),
TodoItem(step_number=3, description="C", status="completed"),
])
assert todos.is_complete is True
def test_pending_todo_ready_when_dep_failed(self):
from crewai.utilities.planning_types import TodoItem, TodoList
todos = TodoList(items=[
TodoItem(step_number=1, description="A", status="failed"),
TodoItem(step_number=2, description="B", depends_on=[1], status="pending"),
])
ready = todos.get_ready_todos()
assert len(ready) == 1, "Downstream todo should be ready when dep is failed"
# =========================================================================
# PlanningConfig Defaults
# =========================================================================
class TestPlanningConfigDefaults:
"""PlanningConfig default reasoning_effort must be 'medium' to match
the runtime fallback in _get_reasoning_effort."""
def test_planning_config_default_is_medium(self):
from crewai.agent.planning_config import PlanningConfig
config = PlanningConfig()
assert config.reasoning_effort == "medium", (
f"Default should be 'medium', got '{config.reasoning_effort}'"
)
def test_explicit_config_matches_implicit_planning(self):
"""Agent(planning=True) and Agent(planning=True, planning_config=PlanningConfig())
should produce the same reasoning_effort."""
from crewai.agent.planning_config import PlanningConfig
config = PlanningConfig()
assert config.reasoning_effort == "medium"
# =========================================================================
# Vision Image Format Contract
# =========================================================================
class TestVisionImageFormatContract:
"""step_executor uses standard image_url format; each provider's
_format_messages handles conversion to its native format."""
def test_step_executor_uses_standard_image_url_format(self):
import inspect
from crewai.agents.step_executor import StepExecutor
source = inspect.getsource(StepExecutor._build_observation_message)
assert "image_url" in source, (
"Step executor should use standard image_url format"
)
def test_anthropic_provider_has_image_block_converter(self):
from crewai.llms.providers.anthropic.completion import AnthropicCompletion
assert hasattr(AnthropicCompletion, "_convert_image_blocks"), (
"Anthropic provider must have _convert_image_blocks for auto-conversion"
)