diff --git a/lib/crewai/src/crewai/agent/planning_config.py b/lib/crewai/src/crewai/agent/planning_config.py index 957ecb33c..c57b60d0a 100644 --- a/lib/crewai/src/crewai/agent/planning_config.py +++ b/lib/crewai/src/crewai/agent/planning_config.py @@ -74,7 +74,7 @@ class PlanningConfig(BaseModel): """ reasoning_effort: Literal["low", "medium", "high"] = Field( - default="low", + default="medium", description=( "Controls post-step observation and replanning behavior. " "'low' observes steps but skips replanning/refinement (fastest). " diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index 8610b08f0..02ffd579a 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -374,10 +374,9 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin): if self.state.plan_ready and output.plan.steps: self._create_todos_from_plan(output.plan.steps) - # Backward compatibility: append plan to task description - # This can be removed in Phase 2 when plan execution is implemented - if self.task and self.state.plan: - self.task.description += f"\n\nPlanning:\n{self.state.plan}" + # Plan is stored in state.plan and used by the execution flow. + # Do NOT mutate task.description — it's a shared object that + # accumulates plan text on re-invoke. except Exception as e: if hasattr(self.agent, "_logger"): @@ -667,17 +666,18 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin): ) return "replan_now" - # Step failed but observer does not require a full replan — continue - self.state.todos.mark_completed( + # Step failed but observer does not require a full replan — mark as + # failed (not completed) so get_failed_todos() tracks it correctly. + self.state.todos.mark_failed( current_todo.step_number, result=current_todo.result ) if self.agent.verbose: - completed = self.state.todos.completed_count + failed = len(self.state.todos.get_failed_todos()) total = len(self.state.todos.items) self._printer.print( content=( f"[Medium] Step {current_todo.step_number} failed but no replan needed " - f"({completed}/{total}) — continuing" + f"({failed} failed/{total} total) — continuing" ), color="yellow", ) diff --git a/lib/crewai/src/crewai/utilities/planning_types.py b/lib/crewai/src/crewai/utilities/planning_types.py index d1c11526c..5e75d00b5 100644 --- a/lib/crewai/src/crewai/utilities/planning_types.py +++ b/lib/crewai/src/crewai/utilities/planning_types.py @@ -99,7 +99,7 @@ class TodoList(BaseModel): item = self.get_by_step_number(step_number) if item: item.status = "completed" - if result: + if result is not None: item.result = result def mark_failed(self, step_number: int, result: str | None = None) -> None: @@ -107,21 +107,27 @@ class TodoList(BaseModel): item = self.get_by_step_number(step_number) if item: item.status = "failed" - if result: + if result is not None: item.result = result def _dependencies_satisfied(self, item: TodoItem) -> bool: - """Check if all dependencies for a todo item are completed. + """Check if all dependencies for a todo item are in a terminal state. + + A dependency is satisfied when it has finished executing — either + successfully (completed) or not (failed). This prevents downstream + todos from being permanently blocked when a dependency fails. + The executor/observer is responsible for deciding whether to skip, + replan, or continue when a dependency has failed. Args: item: The todo item to check dependencies for. Returns: - True if all dependencies are completed, False otherwise. + True if all dependencies are in a terminal state, False otherwise. """ for dep_num in item.depends_on: dep = self.get_by_step_number(dep_num) - if dep is None or dep.status != "completed": + if dep is None or dep.status not in ("completed", "failed"): return False return True diff --git a/lib/crewai/tests/agents/test_agent_executor.py b/lib/crewai/tests/agents/test_agent_executor.py index 5e30d6bb7..7fe1855fe 100644 --- a/lib/crewai/tests/agents/test_agent_executor.py +++ b/lib/crewai/tests/agents/test_agent_executor.py @@ -1531,12 +1531,13 @@ class TestReasoningEffort: assert todo.status == "completed" assert todo.result == "Done successfully" - def test_planning_config_reasoning_effort_default_is_low(self): - """Verify PlanningConfig defaults reasoning_effort to 'low'.""" + def test_planning_config_reasoning_effort_default_is_medium(self): + """Verify PlanningConfig defaults reasoning_effort to 'medium' + (aligned with runtime default in _get_reasoning_effort).""" from crewai.agent.planning_config import PlanningConfig config = PlanningConfig() - assert config.reasoning_effort == "low" + assert config.reasoning_effort == "medium" def test_planning_config_reasoning_effort_validation(self): """Verify PlanningConfig rejects invalid reasoning_effort values.""" @@ -1575,15 +1576,10 @@ class TestReasoningEffort: assert executor._get_reasoning_effort() == "medium" -# ========================================================================= -# P0 Bug Fix Tests -# ========================================================================= - -class TestP0ObserverParseResponse: - """P0 #1: PlannerObserver._parse_observation_response must handle - JSON strings, dicts, and StepObservation instances — not silently - fall back to a hardcoded success default.""" +class TestObserverResponseParsing: + """PlannerObserver must correctly parse LLM responses regardless of + the format returned (StepObservation, JSON string, dict).""" def test_parse_step_observation_instance(self): """Direct StepObservation instance passes through unchanged.""" @@ -1603,7 +1599,7 @@ class TestP0ObserverParseResponse: assert result.needs_full_replan is True def test_parse_json_string(self): - """JSON string (the path that was broken before the fix) is parsed correctly.""" + """JSON string from non-streaming LLM path is parsed correctly.""" import json from crewai.agents.planner_observer import PlannerObserver @@ -1643,7 +1639,7 @@ class TestP0ObserverParseResponse: assert result.key_information_learned == "found 3 files" def test_parse_dict_response(self): - """Dict response (some provider paths) is parsed correctly.""" + """Dict response from some provider paths is parsed correctly.""" from crewai.agents.planner_observer import PlannerObserver from crewai.utilities.planning_types import StepObservation @@ -1662,14 +1658,13 @@ class TestP0ObserverParseResponse: assert result.replan_reason == "step timed out" def test_parse_unparseable_falls_back_gracefully(self): - """Totally unparseable response falls back to default but logs a warning.""" + """Totally unparseable response falls back to default.""" from crewai.agents.planner_observer import PlannerObserver from crewai.utilities.planning_types import StepObservation result = PlannerObserver._parse_observation_response(12345) assert isinstance(result, StepObservation) - # Fallback defaults to success (conservative — don't wipe the plan) assert result.step_completed_successfully is True assert result.remaining_plan_still_valid is True @@ -1680,7 +1675,6 @@ class TestP0ObserverParseResponse: from crewai.agents.planner_observer import PlannerObserver from crewai.utilities.planning_types import StepObservation, TodoItem - # Simulate llm.call() returning a JSON string (the broken path) llm = Mock() llm.call.return_value = json.dumps({ "step_completed_successfully": False, @@ -1709,31 +1703,31 @@ class TestP0ObserverParseResponse: remaining_todos=[], ) - # The critical assertion: the observer actually parsed the LLM's judgment - # instead of falling back to the hardcoded success default assert observation.step_completed_successfully is False assert observation.needs_full_replan is True assert observation.replan_reason == "build system is misconfigured" -class TestP0MaxIterationsRouting: - """P0 #2: check_max_iterations must route to force_final_answer, - not to a dead-end 'max_iterations_exceeded' event.""" +# ========================================================================= +# Max Iterations Routing +# ========================================================================= - def test_check_max_iterations_returns_force_final_answer(self): - """When max iterations are exceeded, route to 'force_final_answer'.""" + +class TestMaxIterationsRouting: + """check_max_iterations must route to force_final_answer when + the iteration limit is exceeded, not to a dead-end event.""" + + def test_exceeded_routes_to_force_final_answer(self): from crewai.experimental.agent_executor import AgentExecutor executor = Mock(spec=AgentExecutor) executor.state = AgentExecutorState(iterations=25) executor.max_iter = 20 - # Call the unbound method with our mock as self result = AgentExecutor.check_max_iterations(executor) assert result == "force_final_answer" - def test_check_max_iterations_continues_when_under_limit(self): - """When under the limit, route to continue_reasoning.""" + def test_under_limit_continues_reasoning(self): from crewai.experimental.agent_executor import AgentExecutor executor = Mock(spec=AgentExecutor) @@ -1743,8 +1737,7 @@ class TestP0MaxIterationsRouting: result = AgentExecutor.check_max_iterations(executor) assert result == "continue_reasoning" - def test_check_max_iterations_native_tools_path(self): - """When under limit with native tools, route to continue_reasoning_native.""" + def test_under_limit_with_native_tools(self): from crewai.experimental.agent_executor import AgentExecutor executor = Mock(spec=AgentExecutor) @@ -1755,45 +1748,38 @@ class TestP0MaxIterationsRouting: assert result == "continue_reasoning_native" -class TestP0UnboundResultNativeToolCall: - """P0 #3: _execute_single_native_tool_call must not leave `result` - unbound when max_usage_reached=True but original_tool is None.""" +# ========================================================================= +# Native Tool Call Edge Cases +# ========================================================================= + + +class TestNativeToolCallMaxUsage: + """_execute_single_native_tool_call must produce a result string + even when max_usage_reached=True and original_tool is None.""" def test_max_usage_reached_without_original_tool(self): - """When max_usage_reached and original_tool is None, still returns a result string.""" from crewai.experimental.agent_executor import AgentExecutor - # Build a minimal executor mock - executor = Mock(spec=AgentExecutor) - executor.agent = Mock() - executor.agent.verbose = False - executor.task = None - executor.crew = None - executor._available_functions = {} - executor.tools_handler = None - - # Call the actual method — it should not raise UnboundLocalError - # We need to construct the right arguments - # The method signature requires specific args, let's just verify - # the branch logic by checking the source was patched import inspect source = inspect.getsource(AgentExecutor._execute_single_native_tool_call) - # The fix: max_usage_reached branch no longer requires original_tool assert "elif max_usage_reached:" in source assert 'result = f"Tool \'{func_name}\' has reached its maximum usage limit' in source -class TestFinalizeCalledReset: - """_finalize_called must be reset on re-invoke to prevent - second invocations from returning immediately with no output.""" +# ========================================================================= +# Executor State Reset on Re-invoke +# ========================================================================= + + +class TestExecutorStateReset: + """invoke() and invoke_async() must reset all execution state + (including _finalize_called) so re-invocations work correctly.""" def test_finalize_called_reset_in_invoke(self): - """invoke() resets _finalize_called before execution.""" import inspect from crewai.experimental.agent_executor import AgentExecutor source = inspect.getsource(AgentExecutor.invoke) - # _finalize_called = False should appear before messages.clear() finalize_idx = source.index("self._finalize_called = False") messages_idx = source.index("self.state.messages.clear()") assert finalize_idx < messages_idx, ( @@ -1801,7 +1787,6 @@ class TestFinalizeCalledReset: ) def test_finalize_called_reset_in_invoke_async(self): - """invoke_async() resets _finalize_called before execution.""" import inspect from crewai.experimental.agent_executor import AgentExecutor @@ -1811,3 +1796,208 @@ class TestFinalizeCalledReset: assert finalize_idx < messages_idx, ( "_finalize_called must be reset before state reset in async path" ) + + +# ========================================================================= +# Plan Generation Isolation +# ========================================================================= + + +class TestPlanGenerationIsolation: + """generate_plan must store the plan in state only — never mutate + the shared task.description object.""" + + def test_generate_plan_does_not_mutate_task_description(self): + import inspect + from crewai.experimental.agent_executor import AgentExecutor + + source = inspect.getsource(AgentExecutor.generate_plan) + assert "task.description +=" not in source, ( + "generate_plan still mutates task.description" + ) + assert "task.description =" not in source or "Plan is stored in state" in source, ( + "generate_plan should store plan in state, not task.description" + ) + + +# ========================================================================= +# Todo Status Tracking +# ========================================================================= + + +class TestTodoStatusTracking: + """Steps that fail without triggering a replan must be marked 'failed' + (not 'completed') so status queries remain accurate.""" + + def test_medium_effort_marks_failed_step_as_failed(self): + import inspect + from crewai.experimental.agent_executor import AgentExecutor + + source = inspect.getsource(AgentExecutor.handle_step_observed_medium) + assert "mark_failed" in source, ( + "handle_step_observed_medium should use mark_failed for failed steps" + ) + failed_no_replan_idx = source.index("failed but no replan") + after_comment = source[failed_no_replan_idx:] + assert "mark_completed" not in after_comment, ( + "mark_completed should not be called on failed steps" + ) + + def test_failed_step_appears_in_get_failed_todos(self): + from crewai.utilities.planning_types import TodoItem, TodoList + + todos = TodoList(items=[ + TodoItem(step_number=1, description="Step 1"), + TodoItem(step_number=2, description="Step 2"), + ]) + + todos.mark_running(1) + todos.mark_failed(1, result="Error: build failed") + + failed = todos.get_failed_todos() + assert len(failed) == 1 + assert failed[0].step_number == 1 + assert failed[0].result == "Error: build failed" + + completed = todos.get_completed_todos() + assert len(completed) == 0 + + +# ========================================================================= +# TodoList Result Handling +# ========================================================================= + + +class TestTodoResultHandling: + """mark_completed/mark_failed must use `is not None` checks so + empty-string results are preserved.""" + + def test_mark_completed_preserves_empty_string(self): + from crewai.utilities.planning_types import TodoItem, TodoList + + todos = TodoList(items=[ + TodoItem(step_number=1, description="Step 1"), + ]) + todos.mark_completed(1, result="") + item = todos.get_by_step_number(1) + assert item.status == "completed" + assert item.result == "", "Empty-string result should be stored, not dropped" + + def test_mark_failed_preserves_empty_string(self): + from crewai.utilities.planning_types import TodoItem, TodoList + + todos = TodoList(items=[ + TodoItem(step_number=1, description="Step 1"), + ]) + todos.mark_failed(1, result="") + item = todos.get_by_step_number(1) + assert item.status == "failed" + assert item.result == "", "Empty-string result should be stored, not dropped" + + def test_mark_completed_none_does_not_overwrite(self): + from crewai.utilities.planning_types import TodoItem, TodoList + + todos = TodoList(items=[ + TodoItem(step_number=1, description="Step 1", result="existing"), + ]) + todos.mark_completed(1, result=None) + item = todos.get_by_step_number(1) + assert item.result == "existing", "None result should not overwrite existing" + + +# ========================================================================= +# Dependency Resolution with Failed Steps +# ========================================================================= + + +class TestDependencyResolutionWithFailures: + """Failed dependencies must be treated as terminal so downstream + todos are not permanently blocked.""" + + def test_failed_dep_unblocks_downstream(self): + from crewai.utilities.planning_types import TodoItem, TodoList + + todos = TodoList(items=[ + TodoItem(step_number=1, description="Build"), + TodoItem(step_number=2, description="Test", depends_on=[1]), + TodoItem(step_number=3, description="Deploy", depends_on=[2]), + ]) + + todos.mark_running(1) + todos.mark_failed(1, result="build error") + + ready = todos.get_ready_todos() + assert len(ready) == 1 + assert ready[0].step_number == 2 + + def test_is_complete_with_mixed_terminal_states(self): + from crewai.utilities.planning_types import TodoItem, TodoList + + todos = TodoList(items=[ + TodoItem(step_number=1, description="A", status="completed"), + TodoItem(step_number=2, description="B", status="failed"), + TodoItem(step_number=3, description="C", status="completed"), + ]) + assert todos.is_complete is True + + def test_pending_todo_ready_when_dep_failed(self): + from crewai.utilities.planning_types import TodoItem, TodoList + + todos = TodoList(items=[ + TodoItem(step_number=1, description="A", status="failed"), + TodoItem(step_number=2, description="B", depends_on=[1], status="pending"), + ]) + ready = todos.get_ready_todos() + assert len(ready) == 1, "Downstream todo should be ready when dep is failed" + + +# ========================================================================= +# PlanningConfig Defaults +# ========================================================================= + + +class TestPlanningConfigDefaults: + """PlanningConfig default reasoning_effort must be 'medium' to match + the runtime fallback in _get_reasoning_effort.""" + + def test_planning_config_default_is_medium(self): + from crewai.agent.planning_config import PlanningConfig + + config = PlanningConfig() + assert config.reasoning_effort == "medium", ( + f"Default should be 'medium', got '{config.reasoning_effort}'" + ) + + def test_explicit_config_matches_implicit_planning(self): + """Agent(planning=True) and Agent(planning=True, planning_config=PlanningConfig()) + should produce the same reasoning_effort.""" + from crewai.agent.planning_config import PlanningConfig + + config = PlanningConfig() + assert config.reasoning_effort == "medium" + + +# ========================================================================= +# Vision Image Format Contract +# ========================================================================= + + +class TestVisionImageFormatContract: + """step_executor uses standard image_url format; each provider's + _format_messages handles conversion to its native format.""" + + def test_step_executor_uses_standard_image_url_format(self): + import inspect + from crewai.agents.step_executor import StepExecutor + + source = inspect.getsource(StepExecutor._build_observation_message) + assert "image_url" in source, ( + "Step executor should use standard image_url format" + ) + + def test_anthropic_provider_has_image_block_converter(self): + from crewai.llms.providers.anthropic.completion import AnthropicCompletion + + assert hasattr(AnthropicCompletion, "_convert_image_blocks"), ( + "Anthropic provider must have _convert_image_blocks for auto-conversion" + )