Compare commits

..

1 Commits

Author SHA1 Message Date
lorenzejay
b546c0a618 feat(planning): enhance planning configuration and observation handling
- Introduced  attribute in  to control LLM calls after each step.
- Updated  to set default  to 1 when planning is enabled without explicit config.
- Modified  to support heuristic observations when LLM calls are disabled.
- Adjusted  to respect  and  settings for step observations.
- Added tests to verify behavior of new configurations and ensure correct observation handling across different reasoning efforts.
2026-05-22 18:18:11 -07:00
8 changed files with 246 additions and 29 deletions

View File

@@ -379,8 +379,17 @@ class Agent(BaseAgent):
DeprecationWarning,
stacklevel=2,
)
kwargs: dict[str, int] = {}
if self.max_reasoning_attempts is not None:
kwargs["max_attempts"] = self.max_reasoning_attempts
self.planning_config = PlanningConfig(**kwargs)
if self.planning and self.planning_config is None:
# Bare planning=True should be bounded and avoid per-step
# PlannerObserver LLM calls unless explicitly configured.
self.planning_config = PlanningConfig(
max_attempts=self.max_reasoning_attempts,
reasoning_effort="low",
max_attempts=1,
)
return self

View File

@@ -19,15 +19,18 @@ class PlanningConfig(BaseModel):
Attributes:
reasoning_effort: Controls observation and replanning after each step.
- "low": Observe each step (validates success), but skip the
decide/replan/refine pipeline. Steps are marked complete and
execution continues linearly. Fastest option.
- "medium": Observe each step. On failure, trigger replanning.
- "low": Skip per-step PlannerObserver LLM calls (heuristic only);
skip the decide/replan/refine pipeline. Fastest option.
- "medium": Observe each step via LLM. On failure, trigger replanning.
On success, skip refinement and continue. Balanced option.
- "high": Full observation pipeline — observe every step, then
route through decide_next_action which can trigger early goal
achievement, full replanning, or lightweight refinement.
Most adaptive but adds latency per step.
observe_steps: When True, run PlannerObserver LLM calls after each step.
When False, use a lightweight heuristic (no extra LLM call).
When None (default), LLM observation runs for "medium" and "high"
only; "low" uses the heuristic path.
max_attempts: Maximum number of planning refinement attempts.
If None, will continue until the agent indicates readiness.
max_steps: Maximum number of steps in the generated plan.
@@ -76,12 +79,21 @@ class PlanningConfig(BaseModel):
default="medium",
description=(
"Controls post-step observation and replanning behavior. "
"'low' observes steps but skips replanning/refinement (fastest). "
"'medium' observes and replans only on step failure (balanced). "
"'low' skips per-step PlannerObserver LLM calls (fastest). "
"'medium' observes via LLM and replans only on step failure (balanced). "
"'high' runs full observation pipeline with replanning, refinement, "
"and early goal detection (most adaptive, highest latency)."
),
)
observe_steps: bool | None = Field(
default=None,
description=(
"Run PlannerObserver LLM calls after each step. "
"None (default): LLM observation for 'medium' and 'high' only; "
"'low' uses a heuristic (no extra LLM). "
"Set False to disable observation at any effort level."
),
)
max_attempts: int | None = Field(
default=None,
description=(

View File

@@ -39,7 +39,8 @@ logger = logging.getLogger(__name__)
class PlannerObserver:
"""Observes step execution results and decides on plan continuation.
After EVERY step execution, this class:
When ``observe_steps`` is enabled (see ``PlanningConfig``), after EVERY
step execution this class:
1. Analyzes what the step accomplished
2. Identifies new information learned
3. Decides if the remaining plan is still valid
@@ -83,6 +84,32 @@ class PlannerObserver:
return create_llm(config.llm)
return self.agent.llm
@staticmethod
def heuristic_observation(
*,
step_success: bool,
result: str = "",
) -> StepObservation:
"""Build an observation without an LLM call.
Used when ``PlanningConfig.observe_steps`` is False or when
``reasoning_effort`` is ``"low"`` (the default skips LLM observation).
Args:
step_success: Whether StepExecutor reported the step as successful.
result: The step result string (unused today; reserved for heuristics).
Returns:
A StepObservation derived from execution metadata only.
"""
_ = result
return StepObservation(
step_completed_successfully=step_success,
key_information_learned="",
remaining_plan_still_valid=True,
needs_full_replan=False,
)
def observe(
self,
completed_step: TodoItem,

View File

@@ -407,6 +407,63 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
return int(config.step_timeout) if config.step_timeout is not None else None
return None
def _should_observe_steps(self) -> bool:
"""Whether to run PlannerObserver LLM calls after each step.
Explicit ``observe_steps=False`` disables observation at any effort level.
``observe_steps=True`` forces it even at ``reasoning_effort="low"``.
When unset, ``low`` skips LLM observation; ``medium`` and ``high`` run it.
"""
config = self.agent.planning_config
if config is not None and config.observe_steps is not None:
return bool(config.observe_steps)
if config is not None and config.reasoning_effort == "low":
return False
return True
def _step_success_from_log(self, step_number: int) -> bool | None:
"""Read StepExecutor success flag from the execution audit log."""
for entry in reversed(self.state.execution_log):
if (
entry.get("type") == "step_execution"
and entry.get("step_number") == step_number
):
success = entry.get("success")
if success is not None:
return bool(success)
return None
def _observe_completed_step(
self,
*,
completed_step: TodoItem,
result: str,
all_completed: list[TodoItem],
remaining_todos: list[TodoItem],
step_success: bool | None = None,
) -> StepObservation:
"""Observe a completed step via LLM or a lightweight heuristic."""
from crewai.agents.planner_observer import PlannerObserver
if self._should_observe_steps():
observer = self._ensure_planner_observer()
return observer.observe(
completed_step=completed_step,
result=result,
all_completed=all_completed,
remaining_todos=remaining_todos,
)
if step_success is None:
step_success = self._step_success_from_log(completed_step.step_number)
if step_success is None:
step_success = True
return PlannerObserver.heuristic_observation(
step_success=step_success,
result=result,
)
def _build_context_for_todo(self, todo: TodoItem) -> StepExecutionContext:
"""Build an isolated execution context for a single todo.
@@ -450,13 +507,13 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
) -> Literal["step_observed_low", "step_observed_medium", "step_observed_high"]:
"""Observe step result and route based on reasoning_effort level.
Always runs PlannerObserver.observe() to validate whether the step
succeeded. Then routes to the appropriate handler based on the
agent's reasoning_effort setting:
Runs PlannerObserver LLM observation when enabled (medium/high by
default; low uses a heuristic with no extra LLM call). Then routes to
the appropriate handler based on the agent's reasoning_effort setting:
- "low": observe → mark complete → continue (no replan/refine)
- "medium": observe → replan on failure only (no refine)
- "high": observe → full decide pipeline (replan/refine/goal-achieved)
- "low": heuristic observe → mark complete → continue (no replan/refine)
- "medium": LLM observe → replan on failure only (no refine)
- "high": LLM observe → full decide pipeline (replan/refine/goal-achieved)
Based on PLAN-AND-ACT Section 3.3.
"""
@@ -467,11 +524,10 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
# No todo — route to low handler which will just continue
return "step_observed_low"
observer = self._ensure_planner_observer()
all_completed = self.state.todos.get_completed_todos()
remaining = self.state.todos.get_pending_todos()
observation = observer.observe(
observation = self._observe_completed_step(
completed_step=current_todo,
result=current_todo.result or "",
all_completed=all_completed,
@@ -491,6 +547,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
"needs_full_replan": observation.needs_full_replan,
"goal_already_achieved": observation.goal_already_achieved,
"reasoning_effort": effort,
"llm_observation": self._should_observe_steps(),
}
)
@@ -1109,17 +1166,17 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
# Observe each completed step sequentially (observation updates shared state)
effort = self._get_reasoning_effort()
observer = self._ensure_planner_observer()
for todo, _result in step_results:
for todo, step_result in step_results:
all_completed = self.state.todos.get_completed_todos()
remaining = self.state.todos.get_pending_todos()
observation = observer.observe(
observation = self._observe_completed_step(
completed_step=todo,
result=todo.result or "",
all_completed=all_completed,
remaining_todos=remaining,
step_success=step_result.success,
)
self.state.observations[todo.step_number] = observation
@@ -1134,6 +1191,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
"needs_full_replan": observation.needs_full_replan,
"goal_already_achieved": observation.goal_already_achieved,
"reasoning_effort": effort,
"llm_observation": self._should_observe_steps(),
}
)

View File

@@ -238,7 +238,6 @@ def extract_task_section(text: str) -> str:
return text[:2000] + "\n... [truncated]"
return text
def _executor_stop_words(
executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None,
) -> list[str]:

View File

@@ -169,10 +169,11 @@ class AgentReasoning:
if self.agent.planning_config is not None:
return self.agent.planning_config
# Fallback for backward compatibility
return PlanningConfig(
max_attempts=getattr(self.agent, "max_reasoning_attempts", None),
)
# Fallback when planning is enabled without an explicit config
max_attempts = getattr(self.agent, "max_reasoning_attempts", None)
if max_attempts is not None:
return PlanningConfig(max_attempts=max_attempts)
return PlanningConfig()
def _resolve_llm(self) -> LLM:
"""Resolve which LLM to use for planning.

View File

@@ -950,6 +950,18 @@ class TestNativeToolExecution:
class TestPlannerObserver:
def test_heuristic_observation_reflects_step_success(self):
from crewai.agents.planner_observer import PlannerObserver
ok = PlannerObserver.heuristic_observation(step_success=True, result="42")
assert ok.step_completed_successfully is True
assert ok.needs_full_replan is False
failed = PlannerObserver.heuristic_observation(
step_success=False, result="Error: timeout"
)
assert failed.step_completed_successfully is False
def test_observe_fallback_is_conservative_on_llm_error(self):
llm = Mock()
llm.call.side_effect = RuntimeError("llm unavailable")
@@ -1332,19 +1344,93 @@ class TestResponseFormatWithKickoff:
class TestReasoningEffort:
"""Test reasoning_effort levels in PlanningConfig.
- low: observe() runs (validates step success), but skip decide/replan/refine
- low: heuristic observation (no LLM), skip decide/replan/refine
- medium: observe() runs, replan on failure only (mocked)
- high: full observation pipeline with decide/replan/refine/goal-achieved
"""
def test_should_observe_steps_respects_config(self):
"""observe_steps and reasoning_effort gate PlannerObserver LLM calls."""
from crewai.agent.planning_config import PlanningConfig
from crewai.experimental.agent_executor import AgentExecutor
executor = Mock(spec=AgentExecutor)
executor._should_observe_steps = (
AgentExecutor._should_observe_steps.__get__(executor)
)
executor.agent = Mock()
executor.agent.planning_config = PlanningConfig(reasoning_effort="low")
assert executor._should_observe_steps() is False
executor.agent.planning_config = PlanningConfig(
reasoning_effort="low", observe_steps=True
)
assert executor._should_observe_steps() is True
executor.agent.planning_config = PlanningConfig(
reasoning_effort="high", observe_steps=False
)
assert executor._should_observe_steps() is False
executor.agent.planning_config = PlanningConfig(reasoning_effort="medium")
assert executor._should_observe_steps() is True
executor.agent.planning_config = None
assert executor._should_observe_steps() is True
def test_reasoning_effort_low_skips_planner_observer_llm(self):
"""Low effort must not call PlannerObserver.observe (no per-step LLM)."""
from crewai.agent.planning_config import PlanningConfig
from crewai.experimental.agent_executor import AgentExecutor
from crewai.utilities.planning_types import TodoItem, TodoList
executor = Mock(spec=AgentExecutor)
executor.agent = Mock()
executor.agent.planning_config = PlanningConfig(reasoning_effort="low")
executor.state = Mock()
executor.state.execution_log = [
{"type": "step_execution", "step_number": 1, "success": True},
]
executor._should_observe_steps = (
AgentExecutor._should_observe_steps.__get__(executor)
)
executor._step_success_from_log = (
AgentExecutor._step_success_from_log.__get__(executor)
)
executor._observe_completed_step = (
AgentExecutor._observe_completed_step.__get__(executor)
)
executor._ensure_planner_observer = Mock()
todo = TodoItem(
step_number=1,
description="Step one",
status="running",
result="done",
)
executor.state.todos = TodoList(items=[todo])
observation = executor._observe_completed_step(
completed_step=todo,
result="done",
all_completed=[],
remaining_todos=[],
)
executor._ensure_planner_observer.assert_not_called()
assert observation.step_completed_successfully is True
@pytest.mark.vcr()
def test_reasoning_effort_low_skips_decide_and_replan(self):
"""Low effort: observe runs but decide/replan/refine are never called.
"""Low effort: heuristic observe, no decide/replan/refine LLM pipeline.
Verifies that with reasoning_effort='low':
1. The agent produces a correct result
2. The observation phase still runs (observations are stored)
2. Observations are still stored (heuristic path)
3. The decide_next_action/refine/replan pipeline is bypassed
4. Per-step observation did not use the PlannerObserver LLM
"""
from crewai import Agent, PlanningConfig
from crewai.llm import LLM
@@ -1382,11 +1468,11 @@ class TestReasoningEffort:
assert result is not None
assert "10" in str(result)
# Verify observations were still collected (observe() ran)
# Verify observations were still collected (heuristic path, no LLM)
executor = executor_ref[0]
if executor is not None and executor.state.todos.items:
assert len(executor.state.observations) > 0, (
"Low effort should still run observe() to validate steps"
"Low effort should still record heuristic observations"
)
# Verify no replan was triggered
@@ -1401,6 +1487,7 @@ class TestReasoningEffort:
]
for log in observation_logs:
assert log.get("reasoning_effort") == "low"
assert log.get("llm_observation") is False
@pytest.mark.vcr()
def test_reasoning_effort_high_runs_full_observation_pipeline(self):

View File

@@ -23,6 +23,8 @@ def test_planning_config_default_values():
assert config.plan_prompt is None
assert config.refine_prompt is None
assert config.llm is None
assert config.observe_steps is None
assert config.reasoning_effort == "medium"
def test_planning_config_custom_values():
@@ -88,6 +90,28 @@ def test_agent_with_planning_config_disabled():
assert agent.planning_enabled is False
def test_planning_true_without_config_sets_bounded_max_attempts():
"""planning=True alone must not leave max_attempts=None (infinite refine loop)."""
llm = LLM("gpt-4o-mini")
agent = Agent(
role="Test Agent",
goal="Test",
backstory="Test",
llm=llm,
planning=True,
verbose=False,
)
assert agent.planning_config is not None
assert agent.planning_config.max_attempts == 1
assert agent.planning_config.reasoning_effort == "low"
assert agent.planning_config.max_steps == 20
assert agent.planning_config.max_replans == 3
assert agent.planning_config.max_step_iterations == 15
assert agent.planning_config.step_timeout is None
def test_planning_enabled_property():
"""Test the planning_enabled property on Agent."""
llm = LLM("gpt-4o-mini")