From 77a61274dcb608e3e06499ea467e0d0dc8d24a94 Mon Sep 17 00:00:00 2001 From: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com> Date: Tue, 26 May 2026 09:10:43 -0700 Subject: [PATCH] feat(planning): enhance planning configuration and observation handling (#5913) * feat(planning): enhance planning configuration and observation handling - Introduced attribute in to control LLM calls after each step. - Updated to set default to 1 when planning is enabled without explicit config. - Modified to support heuristic observations when LLM calls are disabled. - Adjusted to respect and settings for step observations. - Added tests to verify behavior of new configurations and ensure correct observation handling across different reasoning efforts. * fix(agent_executor): update handling of failed steps in low effort mode - Adjusted logic to ensure that failed steps are recorded without marking them as completed when using low reasoning effort. - Introduced feedback for failed steps, allowing the process to continue while tracking failures. - Added a test to verify that failed steps are correctly marked without triggering a replan. - And linted * linted --- lib/crewai/src/crewai/agent/core.py | 11 +- .../src/crewai/agent/planning_config.py | 24 ++- .../src/crewai/agents/planner_observer.py | 29 +++- .../src/crewai/experimental/agent_executor.py | 107 +++++++++++--- .../src/crewai/utilities/reasoning_handler.py | 9 +- .../tests/agents/test_agent_executor.py | 138 +++++++++++++++++- .../tests/agents/test_agent_reasoning.py | 24 +++ 7 files changed, 308 insertions(+), 34 deletions(-) diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 7310c53be..a0eae49fd 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -379,8 +379,17 @@ class Agent(BaseAgent): DeprecationWarning, stacklevel=2, ) + kwargs: dict[str, int] = {} + if self.max_reasoning_attempts is not None: + kwargs["max_attempts"] = self.max_reasoning_attempts + self.planning_config = PlanningConfig(**kwargs) + + if self.planning and self.planning_config is None: + # Bare planning=True should be bounded and avoid per-step + # PlannerObserver LLM calls unless explicitly configured. self.planning_config = PlanningConfig( - max_attempts=self.max_reasoning_attempts, + reasoning_effort="low", + max_attempts=1, ) return self diff --git a/lib/crewai/src/crewai/agent/planning_config.py b/lib/crewai/src/crewai/agent/planning_config.py index cd8124b9c..4575b6508 100644 --- a/lib/crewai/src/crewai/agent/planning_config.py +++ b/lib/crewai/src/crewai/agent/planning_config.py @@ -19,15 +19,18 @@ class PlanningConfig(BaseModel): Attributes: reasoning_effort: Controls observation and replanning after each step. - - "low": Observe each step (validates success), but skip the - decide/replan/refine pipeline. Steps are marked complete and - execution continues linearly. Fastest option. - - "medium": Observe each step. On failure, trigger replanning. + - "low": Skip per-step PlannerObserver LLM calls (heuristic only); + skip the decide/replan/refine pipeline. Fastest option. + - "medium": Observe each step via LLM. On failure, trigger replanning. On success, skip refinement and continue. Balanced option. - "high": Full observation pipeline — observe every step, then route through decide_next_action which can trigger early goal achievement, full replanning, or lightweight refinement. Most adaptive but adds latency per step. + observe_steps: When True, run PlannerObserver LLM calls after each step. + When False, use a lightweight heuristic (no extra LLM call). + When None (default), LLM observation runs for "medium" and "high" + only; "low" uses the heuristic path. max_attempts: Maximum number of planning refinement attempts. If None, will continue until the agent indicates readiness. max_steps: Maximum number of steps in the generated plan. @@ -76,12 +79,21 @@ class PlanningConfig(BaseModel): default="medium", description=( "Controls post-step observation and replanning behavior. " - "'low' observes steps but skips replanning/refinement (fastest). " - "'medium' observes and replans only on step failure (balanced). " + "'low' skips per-step PlannerObserver LLM calls (fastest). " + "'medium' observes via LLM and replans only on step failure (balanced). " "'high' runs full observation pipeline with replanning, refinement, " "and early goal detection (most adaptive, highest latency)." ), ) + observe_steps: bool | None = Field( + default=None, + description=( + "Run PlannerObserver LLM calls after each step. " + "None (default): LLM observation for 'medium' and 'high' only; " + "'low' uses a heuristic (no extra LLM). " + "Set False to disable observation at any effort level." + ), + ) max_attempts: int | None = Field( default=None, description=( diff --git a/lib/crewai/src/crewai/agents/planner_observer.py b/lib/crewai/src/crewai/agents/planner_observer.py index 29d586663..2882ac780 100644 --- a/lib/crewai/src/crewai/agents/planner_observer.py +++ b/lib/crewai/src/crewai/agents/planner_observer.py @@ -39,7 +39,8 @@ logger = logging.getLogger(__name__) class PlannerObserver: """Observes step execution results and decides on plan continuation. - After EVERY step execution, this class: + When ``observe_steps`` is enabled (see ``PlanningConfig``), after EVERY + step execution this class: 1. Analyzes what the step accomplished 2. Identifies new information learned 3. Decides if the remaining plan is still valid @@ -83,6 +84,32 @@ class PlannerObserver: return create_llm(config.llm) return self.agent.llm + @staticmethod + def heuristic_observation( + *, + step_success: bool, + result: str = "", + ) -> StepObservation: + """Build an observation without an LLM call. + + Used when ``PlanningConfig.observe_steps`` is False or when + ``reasoning_effort`` is ``"low"`` (the default skips LLM observation). + + Args: + step_success: Whether StepExecutor reported the step as successful. + result: The step result string (unused today; reserved for heuristics). + + Returns: + A StepObservation derived from execution metadata only. + """ + _ = result + return StepObservation( + step_completed_successfully=step_success, + key_information_learned="", + remaining_plan_still_valid=True, + needs_full_replan=False, + ) + def observe( self, completed_step: TodoItem, diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index 0f31b8eb2..7a139a7a0 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -108,6 +108,7 @@ from crewai.utilities.types import LLMMessage if TYPE_CHECKING: + from crewai.agents.planner_observer import PlannerObserver from crewai.agents.tools_handler import ToolsHandler from crewai.llms.base_llm import BaseLLM from crewai.tools.tool_types import ToolResult @@ -210,7 +211,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor): _has_been_invoked: bool = PrivateAttr(default=False) _instance_id: str = PrivateAttr(default_factory=lambda: str(uuid4())[:8]) _step_executor: Any = PrivateAttr(default=None) - _planner_observer: Any = PrivateAttr(default=None) + _planner_observer: PlannerObserver | None = PrivateAttr(default=None) @model_validator(mode="after") def _setup_executor(self) -> Self: @@ -360,7 +361,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor): ) return self._step_executor - def _ensure_planner_observer(self) -> Any: + def _ensure_planner_observer(self) -> PlannerObserver: """Lazily create the PlannerObserver (avoids circular imports).""" if self._planner_observer is None: from crewai.agents.planner_observer import PlannerObserver @@ -407,6 +408,63 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor): return int(config.step_timeout) if config.step_timeout is not None else None return None + def _should_observe_steps(self) -> bool: + """Whether to run PlannerObserver LLM calls after each step. + + Explicit ``observe_steps=False`` disables observation at any effort level. + ``observe_steps=True`` forces it even at ``reasoning_effort="low"``. + When unset, ``low`` skips LLM observation; ``medium`` and ``high`` run it. + """ + config = self.agent.planning_config + if config is not None and config.observe_steps is not None: + return bool(config.observe_steps) + if config is not None and config.reasoning_effort == "low": + return False + return True + + def _step_success_from_log(self, step_number: int) -> bool | None: + """Read StepExecutor success flag from the execution audit log.""" + for entry in reversed(self.state.execution_log): + if ( + entry.get("type") == "step_execution" + and entry.get("step_number") == step_number + ): + success = entry.get("success") + if success is not None: + return bool(success) + return None + + def _observe_completed_step( + self, + *, + completed_step: TodoItem, + result: str, + all_completed: list[TodoItem], + remaining_todos: list[TodoItem], + step_success: bool | None = None, + ) -> StepObservation: + """Observe a completed step via LLM or a lightweight heuristic.""" + from crewai.agents.planner_observer import PlannerObserver + + if self._should_observe_steps(): + observer = self._ensure_planner_observer() + return observer.observe( + completed_step=completed_step, + result=result, + all_completed=all_completed, + remaining_todos=remaining_todos, + ) + + if step_success is None: + step_success = self._step_success_from_log(completed_step.step_number) + if step_success is None: + step_success = True + + return PlannerObserver.heuristic_observation( + step_success=step_success, + result=result, + ) + def _build_context_for_todo(self, todo: TodoItem) -> StepExecutionContext: """Build an isolated execution context for a single todo. @@ -450,13 +508,13 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor): ) -> Literal["step_observed_low", "step_observed_medium", "step_observed_high"]: """Observe step result and route based on reasoning_effort level. - Always runs PlannerObserver.observe() to validate whether the step - succeeded. Then routes to the appropriate handler based on the - agent's reasoning_effort setting: + Runs PlannerObserver LLM observation when enabled (medium/high by + default; low uses a heuristic with no extra LLM call). Then routes to + the appropriate handler based on the agent's reasoning_effort setting: - - "low": observe → mark complete → continue (no replan/refine) - - "medium": observe → replan on failure only (no refine) - - "high": observe → full decide pipeline (replan/refine/goal-achieved) + - "low": heuristic observe → mark complete → continue (no replan/refine) + - "medium": LLM observe → replan on failure only (no refine) + - "high": LLM observe → full decide pipeline (replan/refine/goal-achieved) Based on PLAN-AND-ACT Section 3.3. """ @@ -467,11 +525,10 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor): # No todo — route to low handler which will just continue return "step_observed_low" - observer = self._ensure_planner_observer() all_completed = self.state.todos.get_completed_todos() remaining = self.state.todos.get_pending_todos() - observation = observer.observe( + observation = self._observe_completed_step( completed_step=current_todo, result=current_todo.result or "", all_completed=all_completed, @@ -491,6 +548,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor): "needs_full_replan": observation.needs_full_replan, "goal_already_achieved": observation.goal_already_achieved, "reasoning_effort": effort, + "llm_observation": self._should_observe_steps(), } ) @@ -532,10 +590,8 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor): observation = self.state.observations.get(current_todo.step_number) - # Even at low effort, don't ignore a hard step failure. - # A hard failure is one where the step did not succeed AND a replan - # is explicitly required (e.g. required tool not found, permission - # denied, environment misconfiguration). + # Even at low effort, don't record failed steps as completed. Only + # trigger replanning for hard failures that explicitly require it. if ( observation and not observation.step_completed_successfully @@ -557,6 +613,22 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor): ) return "replan_now" + if observation and not observation.step_completed_successfully: + self.state.todos.mark_failed( + current_todo.step_number, result=current_todo.result + ) + if self.agent.verbose: + failed = len(self.state.todos.get_failed_todos()) + total = len(self.state.todos.items) + PRINTER.print( + content=( + f"[Low] Step {current_todo.step_number} failed " + f"({failed} failed/{total} total) — continuing" + ), + color="yellow", + ) + return "continue_plan" + self.state.todos.mark_completed( current_todo.step_number, result=current_todo.result ) @@ -1109,17 +1181,17 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor): # Observe each completed step sequentially (observation updates shared state) effort = self._get_reasoning_effort() - observer = self._ensure_planner_observer() - for todo, _result in step_results: + for todo, step_result in step_results: all_completed = self.state.todos.get_completed_todos() remaining = self.state.todos.get_pending_todos() - observation = observer.observe( + observation = self._observe_completed_step( completed_step=todo, result=todo.result or "", all_completed=all_completed, remaining_todos=remaining, + step_success=step_result.success, ) self.state.observations[todo.step_number] = observation @@ -1134,6 +1206,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor): "needs_full_replan": observation.needs_full_replan, "goal_already_achieved": observation.goal_already_achieved, "reasoning_effort": effort, + "llm_observation": self._should_observe_steps(), } ) diff --git a/lib/crewai/src/crewai/utilities/reasoning_handler.py b/lib/crewai/src/crewai/utilities/reasoning_handler.py index ab3cbba16..e14a875af 100644 --- a/lib/crewai/src/crewai/utilities/reasoning_handler.py +++ b/lib/crewai/src/crewai/utilities/reasoning_handler.py @@ -169,10 +169,11 @@ class AgentReasoning: if self.agent.planning_config is not None: return self.agent.planning_config - # Fallback for backward compatibility - return PlanningConfig( - max_attempts=getattr(self.agent, "max_reasoning_attempts", None), - ) + # Fallback when planning is enabled without an explicit config + max_attempts = getattr(self.agent, "max_reasoning_attempts", None) + if max_attempts is not None: + return PlanningConfig(max_attempts=max_attempts) + return PlanningConfig() def _resolve_llm(self) -> LLM: """Resolve which LLM to use for planning. diff --git a/lib/crewai/tests/agents/test_agent_executor.py b/lib/crewai/tests/agents/test_agent_executor.py index 3413e30ac..72ab239b1 100644 --- a/lib/crewai/tests/agents/test_agent_executor.py +++ b/lib/crewai/tests/agents/test_agent_executor.py @@ -950,6 +950,18 @@ class TestNativeToolExecution: class TestPlannerObserver: + def test_heuristic_observation_reflects_step_success(self): + from crewai.agents.planner_observer import PlannerObserver + + ok = PlannerObserver.heuristic_observation(step_success=True, result="42") + assert ok.step_completed_successfully is True + assert ok.needs_full_replan is False + + failed = PlannerObserver.heuristic_observation( + step_success=False, result="Error: timeout" + ) + assert failed.step_completed_successfully is False + def test_observe_fallback_is_conservative_on_llm_error(self): llm = Mock() llm.call.side_effect = RuntimeError("llm unavailable") @@ -1332,19 +1344,93 @@ class TestResponseFormatWithKickoff: class TestReasoningEffort: """Test reasoning_effort levels in PlanningConfig. - - low: observe() runs (validates step success), but skip decide/replan/refine + - low: heuristic observation (no LLM), skip decide/replan/refine - medium: observe() runs, replan on failure only (mocked) - high: full observation pipeline with decide/replan/refine/goal-achieved """ + def test_should_observe_steps_respects_config(self): + """observe_steps and reasoning_effort gate PlannerObserver LLM calls.""" + from crewai.agent.planning_config import PlanningConfig + from crewai.experimental.agent_executor import AgentExecutor + + executor = Mock(spec=AgentExecutor) + executor._should_observe_steps = ( + AgentExecutor._should_observe_steps.__get__(executor) + ) + executor.agent = Mock() + + executor.agent.planning_config = PlanningConfig(reasoning_effort="low") + assert executor._should_observe_steps() is False + + executor.agent.planning_config = PlanningConfig( + reasoning_effort="low", observe_steps=True + ) + assert executor._should_observe_steps() is True + + executor.agent.planning_config = PlanningConfig( + reasoning_effort="high", observe_steps=False + ) + assert executor._should_observe_steps() is False + + executor.agent.planning_config = PlanningConfig(reasoning_effort="medium") + assert executor._should_observe_steps() is True + + executor.agent.planning_config = None + assert executor._should_observe_steps() is True + + def test_reasoning_effort_low_skips_planner_observer_llm(self): + """Low effort must not call PlannerObserver.observe (no per-step LLM).""" + from crewai.agent.planning_config import PlanningConfig + from crewai.experimental.agent_executor import AgentExecutor + from crewai.utilities.planning_types import TodoItem, TodoList + + executor = Mock(spec=AgentExecutor) + executor.agent = Mock() + executor.agent.planning_config = PlanningConfig(reasoning_effort="low") + executor.state = Mock() + executor.state.execution_log = [ + {"type": "step_execution", "step_number": 1, "success": True}, + ] + + executor._should_observe_steps = ( + AgentExecutor._should_observe_steps.__get__(executor) + ) + executor._step_success_from_log = ( + AgentExecutor._step_success_from_log.__get__(executor) + ) + executor._observe_completed_step = ( + AgentExecutor._observe_completed_step.__get__(executor) + ) + executor._ensure_planner_observer = Mock() + + todo = TodoItem( + step_number=1, + description="Step one", + status="running", + result="done", + ) + executor.state.todos = TodoList(items=[todo]) + + observation = executor._observe_completed_step( + completed_step=todo, + result="done", + all_completed=[], + remaining_todos=[], + ) + + executor._ensure_planner_observer.assert_not_called() + assert observation.step_completed_successfully is True + @pytest.mark.vcr() def test_reasoning_effort_low_skips_decide_and_replan(self): - """Low effort: observe runs but decide/replan/refine are never called. + """Low effort: heuristic observe, no decide/replan/refine LLM pipeline. Verifies that with reasoning_effort='low': 1. The agent produces a correct result - 2. The observation phase still runs (observations are stored) + 2. Observations are still stored (heuristic path) 3. The decide_next_action/refine/replan pipeline is bypassed + 4. Per-step observation did not use the PlannerObserver LLM """ from crewai import Agent, PlanningConfig from crewai.llm import LLM @@ -1382,11 +1468,11 @@ class TestReasoningEffort: assert result is not None assert "10" in str(result) - # Verify observations were still collected (observe() ran) + # Verify observations were still collected (heuristic path, no LLM) executor = executor_ref[0] if executor is not None and executor.state.todos.items: assert len(executor.state.observations) > 0, ( - "Low effort should still run observe() to validate steps" + "Low effort should still record heuristic observations" ) # Verify no replan was triggered @@ -1401,6 +1487,7 @@ class TestReasoningEffort: ] for log in observation_logs: assert log.get("reasoning_effort") == "low" + assert log.get("llm_observation") is False @pytest.mark.vcr() def test_reasoning_effort_high_runs_full_observation_pipeline(self): @@ -1573,6 +1660,47 @@ class TestReasoningEffort: assert todo.status == "completed" assert todo.result == "Done successfully" + def test_reasoning_effort_low_marks_failed_steps_failed_without_replan(self): + """Low effort records failed heuristic observations without replanning.""" + from crewai.experimental.agent_executor import AgentExecutor + from crewai.utilities.planning_types import ( + StepObservation, + TodoItem, + TodoList, + ) + + executor = Mock(spec=AgentExecutor) + executor.agent = Mock() + executor.agent.verbose = False + executor.agent.planning_config = Mock() + executor.agent.planning_config.reasoning_effort = "low" + executor.handle_step_observed_low = ( + AgentExecutor.handle_step_observed_low.__get__(executor) + ) + + todo = TodoItem( + step_number=1, + description="Do something", + status="running", + result="Error: tool failed", + ) + todo_list = TodoList(items=[todo]) + executor.state = Mock() + executor.state.todos = todo_list + executor.state.observations = { + 1: StepObservation( + step_completed_successfully=False, + key_information_learned="", + remaining_plan_still_valid=True, + needs_full_replan=False, + ) + } + + route = executor.handle_step_observed_low() + assert route == "continue_plan" + assert todo.status == "failed" + assert todo.result == "Error: tool failed" + def test_planning_config_reasoning_effort_default_is_medium(self): """Verify PlanningConfig defaults reasoning_effort to 'medium' (aligned with runtime default in _get_reasoning_effort).""" diff --git a/lib/crewai/tests/agents/test_agent_reasoning.py b/lib/crewai/tests/agents/test_agent_reasoning.py index f04ad8c31..68e7c0556 100644 --- a/lib/crewai/tests/agents/test_agent_reasoning.py +++ b/lib/crewai/tests/agents/test_agent_reasoning.py @@ -23,6 +23,8 @@ def test_planning_config_default_values(): assert config.plan_prompt is None assert config.refine_prompt is None assert config.llm is None + assert config.observe_steps is None + assert config.reasoning_effort == "medium" def test_planning_config_custom_values(): @@ -88,6 +90,28 @@ def test_agent_with_planning_config_disabled(): assert agent.planning_enabled is False +def test_planning_true_without_config_sets_bounded_max_attempts(): + """planning=True alone must not leave max_attempts=None (infinite refine loop).""" + llm = LLM("gpt-4o-mini") + + agent = Agent( + role="Test Agent", + goal="Test", + backstory="Test", + llm=llm, + planning=True, + verbose=False, + ) + + assert agent.planning_config is not None + assert agent.planning_config.max_attempts == 1 + assert agent.planning_config.reasoning_effort == "low" + assert agent.planning_config.max_steps == 20 + assert agent.planning_config.max_replans == 3 + assert agent.planning_config.max_step_iterations == 15 + assert agent.planning_config.step_timeout is None + + def test_planning_enabled_property(): """Test the planning_enabled property on Agent.""" llm = LLM("gpt-4o-mini")