mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-23 09:58:16 +00:00
Compare commits
1 Commits
docs/check
...
lorenze/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b546c0a618 |
@@ -379,8 +379,17 @@ class Agent(BaseAgent):
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
kwargs: dict[str, int] = {}
|
||||
if self.max_reasoning_attempts is not None:
|
||||
kwargs["max_attempts"] = self.max_reasoning_attempts
|
||||
self.planning_config = PlanningConfig(**kwargs)
|
||||
|
||||
if self.planning and self.planning_config is None:
|
||||
# Bare planning=True should be bounded and avoid per-step
|
||||
# PlannerObserver LLM calls unless explicitly configured.
|
||||
self.planning_config = PlanningConfig(
|
||||
max_attempts=self.max_reasoning_attempts,
|
||||
reasoning_effort="low",
|
||||
max_attempts=1,
|
||||
)
|
||||
|
||||
return self
|
||||
|
||||
@@ -19,15 +19,18 @@ class PlanningConfig(BaseModel):
|
||||
|
||||
Attributes:
|
||||
reasoning_effort: Controls observation and replanning after each step.
|
||||
- "low": Observe each step (validates success), but skip the
|
||||
decide/replan/refine pipeline. Steps are marked complete and
|
||||
execution continues linearly. Fastest option.
|
||||
- "medium": Observe each step. On failure, trigger replanning.
|
||||
- "low": Skip per-step PlannerObserver LLM calls (heuristic only);
|
||||
skip the decide/replan/refine pipeline. Fastest option.
|
||||
- "medium": Observe each step via LLM. On failure, trigger replanning.
|
||||
On success, skip refinement and continue. Balanced option.
|
||||
- "high": Full observation pipeline — observe every step, then
|
||||
route through decide_next_action which can trigger early goal
|
||||
achievement, full replanning, or lightweight refinement.
|
||||
Most adaptive but adds latency per step.
|
||||
observe_steps: When True, run PlannerObserver LLM calls after each step.
|
||||
When False, use a lightweight heuristic (no extra LLM call).
|
||||
When None (default), LLM observation runs for "medium" and "high"
|
||||
only; "low" uses the heuristic path.
|
||||
max_attempts: Maximum number of planning refinement attempts.
|
||||
If None, will continue until the agent indicates readiness.
|
||||
max_steps: Maximum number of steps in the generated plan.
|
||||
@@ -76,12 +79,21 @@ class PlanningConfig(BaseModel):
|
||||
default="medium",
|
||||
description=(
|
||||
"Controls post-step observation and replanning behavior. "
|
||||
"'low' observes steps but skips replanning/refinement (fastest). "
|
||||
"'medium' observes and replans only on step failure (balanced). "
|
||||
"'low' skips per-step PlannerObserver LLM calls (fastest). "
|
||||
"'medium' observes via LLM and replans only on step failure (balanced). "
|
||||
"'high' runs full observation pipeline with replanning, refinement, "
|
||||
"and early goal detection (most adaptive, highest latency)."
|
||||
),
|
||||
)
|
||||
observe_steps: bool | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Run PlannerObserver LLM calls after each step. "
|
||||
"None (default): LLM observation for 'medium' and 'high' only; "
|
||||
"'low' uses a heuristic (no extra LLM). "
|
||||
"Set False to disable observation at any effort level."
|
||||
),
|
||||
)
|
||||
max_attempts: int | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
|
||||
@@ -39,7 +39,8 @@ logger = logging.getLogger(__name__)
|
||||
class PlannerObserver:
|
||||
"""Observes step execution results and decides on plan continuation.
|
||||
|
||||
After EVERY step execution, this class:
|
||||
When ``observe_steps`` is enabled (see ``PlanningConfig``), after EVERY
|
||||
step execution this class:
|
||||
1. Analyzes what the step accomplished
|
||||
2. Identifies new information learned
|
||||
3. Decides if the remaining plan is still valid
|
||||
@@ -83,6 +84,32 @@ class PlannerObserver:
|
||||
return create_llm(config.llm)
|
||||
return self.agent.llm
|
||||
|
||||
@staticmethod
|
||||
def heuristic_observation(
|
||||
*,
|
||||
step_success: bool,
|
||||
result: str = "",
|
||||
) -> StepObservation:
|
||||
"""Build an observation without an LLM call.
|
||||
|
||||
Used when ``PlanningConfig.observe_steps`` is False or when
|
||||
``reasoning_effort`` is ``"low"`` (the default skips LLM observation).
|
||||
|
||||
Args:
|
||||
step_success: Whether StepExecutor reported the step as successful.
|
||||
result: The step result string (unused today; reserved for heuristics).
|
||||
|
||||
Returns:
|
||||
A StepObservation derived from execution metadata only.
|
||||
"""
|
||||
_ = result
|
||||
return StepObservation(
|
||||
step_completed_successfully=step_success,
|
||||
key_information_learned="",
|
||||
remaining_plan_still_valid=True,
|
||||
needs_full_replan=False,
|
||||
)
|
||||
|
||||
def observe(
|
||||
self,
|
||||
completed_step: TodoItem,
|
||||
|
||||
@@ -407,6 +407,63 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
|
||||
return int(config.step_timeout) if config.step_timeout is not None else None
|
||||
return None
|
||||
|
||||
def _should_observe_steps(self) -> bool:
|
||||
"""Whether to run PlannerObserver LLM calls after each step.
|
||||
|
||||
Explicit ``observe_steps=False`` disables observation at any effort level.
|
||||
``observe_steps=True`` forces it even at ``reasoning_effort="low"``.
|
||||
When unset, ``low`` skips LLM observation; ``medium`` and ``high`` run it.
|
||||
"""
|
||||
config = self.agent.planning_config
|
||||
if config is not None and config.observe_steps is not None:
|
||||
return bool(config.observe_steps)
|
||||
if config is not None and config.reasoning_effort == "low":
|
||||
return False
|
||||
return True
|
||||
|
||||
def _step_success_from_log(self, step_number: int) -> bool | None:
|
||||
"""Read StepExecutor success flag from the execution audit log."""
|
||||
for entry in reversed(self.state.execution_log):
|
||||
if (
|
||||
entry.get("type") == "step_execution"
|
||||
and entry.get("step_number") == step_number
|
||||
):
|
||||
success = entry.get("success")
|
||||
if success is not None:
|
||||
return bool(success)
|
||||
return None
|
||||
|
||||
def _observe_completed_step(
|
||||
self,
|
||||
*,
|
||||
completed_step: TodoItem,
|
||||
result: str,
|
||||
all_completed: list[TodoItem],
|
||||
remaining_todos: list[TodoItem],
|
||||
step_success: bool | None = None,
|
||||
) -> StepObservation:
|
||||
"""Observe a completed step via LLM or a lightweight heuristic."""
|
||||
from crewai.agents.planner_observer import PlannerObserver
|
||||
|
||||
if self._should_observe_steps():
|
||||
observer = self._ensure_planner_observer()
|
||||
return observer.observe(
|
||||
completed_step=completed_step,
|
||||
result=result,
|
||||
all_completed=all_completed,
|
||||
remaining_todos=remaining_todos,
|
||||
)
|
||||
|
||||
if step_success is None:
|
||||
step_success = self._step_success_from_log(completed_step.step_number)
|
||||
if step_success is None:
|
||||
step_success = True
|
||||
|
||||
return PlannerObserver.heuristic_observation(
|
||||
step_success=step_success,
|
||||
result=result,
|
||||
)
|
||||
|
||||
def _build_context_for_todo(self, todo: TodoItem) -> StepExecutionContext:
|
||||
"""Build an isolated execution context for a single todo.
|
||||
|
||||
@@ -450,13 +507,13 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
|
||||
) -> Literal["step_observed_low", "step_observed_medium", "step_observed_high"]:
|
||||
"""Observe step result and route based on reasoning_effort level.
|
||||
|
||||
Always runs PlannerObserver.observe() to validate whether the step
|
||||
succeeded. Then routes to the appropriate handler based on the
|
||||
agent's reasoning_effort setting:
|
||||
Runs PlannerObserver LLM observation when enabled (medium/high by
|
||||
default; low uses a heuristic with no extra LLM call). Then routes to
|
||||
the appropriate handler based on the agent's reasoning_effort setting:
|
||||
|
||||
- "low": observe → mark complete → continue (no replan/refine)
|
||||
- "medium": observe → replan on failure only (no refine)
|
||||
- "high": observe → full decide pipeline (replan/refine/goal-achieved)
|
||||
- "low": heuristic observe → mark complete → continue (no replan/refine)
|
||||
- "medium": LLM observe → replan on failure only (no refine)
|
||||
- "high": LLM observe → full decide pipeline (replan/refine/goal-achieved)
|
||||
|
||||
Based on PLAN-AND-ACT Section 3.3.
|
||||
"""
|
||||
@@ -467,11 +524,10 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
|
||||
# No todo — route to low handler which will just continue
|
||||
return "step_observed_low"
|
||||
|
||||
observer = self._ensure_planner_observer()
|
||||
all_completed = self.state.todos.get_completed_todos()
|
||||
remaining = self.state.todos.get_pending_todos()
|
||||
|
||||
observation = observer.observe(
|
||||
observation = self._observe_completed_step(
|
||||
completed_step=current_todo,
|
||||
result=current_todo.result or "",
|
||||
all_completed=all_completed,
|
||||
@@ -491,6 +547,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
|
||||
"needs_full_replan": observation.needs_full_replan,
|
||||
"goal_already_achieved": observation.goal_already_achieved,
|
||||
"reasoning_effort": effort,
|
||||
"llm_observation": self._should_observe_steps(),
|
||||
}
|
||||
)
|
||||
|
||||
@@ -1109,17 +1166,17 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
|
||||
|
||||
# Observe each completed step sequentially (observation updates shared state)
|
||||
effort = self._get_reasoning_effort()
|
||||
observer = self._ensure_planner_observer()
|
||||
|
||||
for todo, _result in step_results:
|
||||
for todo, step_result in step_results:
|
||||
all_completed = self.state.todos.get_completed_todos()
|
||||
remaining = self.state.todos.get_pending_todos()
|
||||
|
||||
observation = observer.observe(
|
||||
observation = self._observe_completed_step(
|
||||
completed_step=todo,
|
||||
result=todo.result or "",
|
||||
all_completed=all_completed,
|
||||
remaining_todos=remaining,
|
||||
step_success=step_result.success,
|
||||
)
|
||||
|
||||
self.state.observations[todo.step_number] = observation
|
||||
@@ -1134,6 +1191,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
|
||||
"needs_full_replan": observation.needs_full_replan,
|
||||
"goal_already_achieved": observation.goal_already_achieved,
|
||||
"reasoning_effort": effort,
|
||||
"llm_observation": self._should_observe_steps(),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -238,7 +238,6 @@ def extract_task_section(text: str) -> str:
|
||||
return text[:2000] + "\n... [truncated]"
|
||||
return text
|
||||
|
||||
|
||||
def _executor_stop_words(
|
||||
executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None,
|
||||
) -> list[str]:
|
||||
|
||||
@@ -169,10 +169,11 @@ class AgentReasoning:
|
||||
|
||||
if self.agent.planning_config is not None:
|
||||
return self.agent.planning_config
|
||||
# Fallback for backward compatibility
|
||||
return PlanningConfig(
|
||||
max_attempts=getattr(self.agent, "max_reasoning_attempts", None),
|
||||
)
|
||||
# Fallback when planning is enabled without an explicit config
|
||||
max_attempts = getattr(self.agent, "max_reasoning_attempts", None)
|
||||
if max_attempts is not None:
|
||||
return PlanningConfig(max_attempts=max_attempts)
|
||||
return PlanningConfig()
|
||||
|
||||
def _resolve_llm(self) -> LLM:
|
||||
"""Resolve which LLM to use for planning.
|
||||
|
||||
@@ -950,6 +950,18 @@ class TestNativeToolExecution:
|
||||
|
||||
|
||||
class TestPlannerObserver:
|
||||
def test_heuristic_observation_reflects_step_success(self):
|
||||
from crewai.agents.planner_observer import PlannerObserver
|
||||
|
||||
ok = PlannerObserver.heuristic_observation(step_success=True, result="42")
|
||||
assert ok.step_completed_successfully is True
|
||||
assert ok.needs_full_replan is False
|
||||
|
||||
failed = PlannerObserver.heuristic_observation(
|
||||
step_success=False, result="Error: timeout"
|
||||
)
|
||||
assert failed.step_completed_successfully is False
|
||||
|
||||
def test_observe_fallback_is_conservative_on_llm_error(self):
|
||||
llm = Mock()
|
||||
llm.call.side_effect = RuntimeError("llm unavailable")
|
||||
@@ -1332,19 +1344,93 @@ class TestResponseFormatWithKickoff:
|
||||
class TestReasoningEffort:
|
||||
"""Test reasoning_effort levels in PlanningConfig.
|
||||
|
||||
- low: observe() runs (validates step success), but skip decide/replan/refine
|
||||
- low: heuristic observation (no LLM), skip decide/replan/refine
|
||||
- medium: observe() runs, replan on failure only (mocked)
|
||||
- high: full observation pipeline with decide/replan/refine/goal-achieved
|
||||
"""
|
||||
|
||||
def test_should_observe_steps_respects_config(self):
|
||||
"""observe_steps and reasoning_effort gate PlannerObserver LLM calls."""
|
||||
from crewai.agent.planning_config import PlanningConfig
|
||||
from crewai.experimental.agent_executor import AgentExecutor
|
||||
|
||||
executor = Mock(spec=AgentExecutor)
|
||||
executor._should_observe_steps = (
|
||||
AgentExecutor._should_observe_steps.__get__(executor)
|
||||
)
|
||||
executor.agent = Mock()
|
||||
|
||||
executor.agent.planning_config = PlanningConfig(reasoning_effort="low")
|
||||
assert executor._should_observe_steps() is False
|
||||
|
||||
executor.agent.planning_config = PlanningConfig(
|
||||
reasoning_effort="low", observe_steps=True
|
||||
)
|
||||
assert executor._should_observe_steps() is True
|
||||
|
||||
executor.agent.planning_config = PlanningConfig(
|
||||
reasoning_effort="high", observe_steps=False
|
||||
)
|
||||
assert executor._should_observe_steps() is False
|
||||
|
||||
executor.agent.planning_config = PlanningConfig(reasoning_effort="medium")
|
||||
assert executor._should_observe_steps() is True
|
||||
|
||||
executor.agent.planning_config = None
|
||||
assert executor._should_observe_steps() is True
|
||||
|
||||
def test_reasoning_effort_low_skips_planner_observer_llm(self):
|
||||
"""Low effort must not call PlannerObserver.observe (no per-step LLM)."""
|
||||
from crewai.agent.planning_config import PlanningConfig
|
||||
from crewai.experimental.agent_executor import AgentExecutor
|
||||
from crewai.utilities.planning_types import TodoItem, TodoList
|
||||
|
||||
executor = Mock(spec=AgentExecutor)
|
||||
executor.agent = Mock()
|
||||
executor.agent.planning_config = PlanningConfig(reasoning_effort="low")
|
||||
executor.state = Mock()
|
||||
executor.state.execution_log = [
|
||||
{"type": "step_execution", "step_number": 1, "success": True},
|
||||
]
|
||||
|
||||
executor._should_observe_steps = (
|
||||
AgentExecutor._should_observe_steps.__get__(executor)
|
||||
)
|
||||
executor._step_success_from_log = (
|
||||
AgentExecutor._step_success_from_log.__get__(executor)
|
||||
)
|
||||
executor._observe_completed_step = (
|
||||
AgentExecutor._observe_completed_step.__get__(executor)
|
||||
)
|
||||
executor._ensure_planner_observer = Mock()
|
||||
|
||||
todo = TodoItem(
|
||||
step_number=1,
|
||||
description="Step one",
|
||||
status="running",
|
||||
result="done",
|
||||
)
|
||||
executor.state.todos = TodoList(items=[todo])
|
||||
|
||||
observation = executor._observe_completed_step(
|
||||
completed_step=todo,
|
||||
result="done",
|
||||
all_completed=[],
|
||||
remaining_todos=[],
|
||||
)
|
||||
|
||||
executor._ensure_planner_observer.assert_not_called()
|
||||
assert observation.step_completed_successfully is True
|
||||
|
||||
@pytest.mark.vcr()
|
||||
def test_reasoning_effort_low_skips_decide_and_replan(self):
|
||||
"""Low effort: observe runs but decide/replan/refine are never called.
|
||||
"""Low effort: heuristic observe, no decide/replan/refine LLM pipeline.
|
||||
|
||||
Verifies that with reasoning_effort='low':
|
||||
1. The agent produces a correct result
|
||||
2. The observation phase still runs (observations are stored)
|
||||
2. Observations are still stored (heuristic path)
|
||||
3. The decide_next_action/refine/replan pipeline is bypassed
|
||||
4. Per-step observation did not use the PlannerObserver LLM
|
||||
"""
|
||||
from crewai import Agent, PlanningConfig
|
||||
from crewai.llm import LLM
|
||||
@@ -1382,11 +1468,11 @@ class TestReasoningEffort:
|
||||
assert result is not None
|
||||
assert "10" in str(result)
|
||||
|
||||
# Verify observations were still collected (observe() ran)
|
||||
# Verify observations were still collected (heuristic path, no LLM)
|
||||
executor = executor_ref[0]
|
||||
if executor is not None and executor.state.todos.items:
|
||||
assert len(executor.state.observations) > 0, (
|
||||
"Low effort should still run observe() to validate steps"
|
||||
"Low effort should still record heuristic observations"
|
||||
)
|
||||
|
||||
# Verify no replan was triggered
|
||||
@@ -1401,6 +1487,7 @@ class TestReasoningEffort:
|
||||
]
|
||||
for log in observation_logs:
|
||||
assert log.get("reasoning_effort") == "low"
|
||||
assert log.get("llm_observation") is False
|
||||
|
||||
@pytest.mark.vcr()
|
||||
def test_reasoning_effort_high_runs_full_observation_pipeline(self):
|
||||
|
||||
@@ -23,6 +23,8 @@ def test_planning_config_default_values():
|
||||
assert config.plan_prompt is None
|
||||
assert config.refine_prompt is None
|
||||
assert config.llm is None
|
||||
assert config.observe_steps is None
|
||||
assert config.reasoning_effort == "medium"
|
||||
|
||||
|
||||
def test_planning_config_custom_values():
|
||||
@@ -88,6 +90,28 @@ def test_agent_with_planning_config_disabled():
|
||||
assert agent.planning_enabled is False
|
||||
|
||||
|
||||
def test_planning_true_without_config_sets_bounded_max_attempts():
|
||||
"""planning=True alone must not leave max_attempts=None (infinite refine loop)."""
|
||||
llm = LLM("gpt-4o-mini")
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test",
|
||||
backstory="Test",
|
||||
llm=llm,
|
||||
planning=True,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
assert agent.planning_config is not None
|
||||
assert agent.planning_config.max_attempts == 1
|
||||
assert agent.planning_config.reasoning_effort == "low"
|
||||
assert agent.planning_config.max_steps == 20
|
||||
assert agent.planning_config.max_replans == 3
|
||||
assert agent.planning_config.max_step_iterations == 15
|
||||
assert agent.planning_config.step_timeout is None
|
||||
|
||||
|
||||
def test_planning_enabled_property():
|
||||
"""Test the planning_enabled property on Agent."""
|
||||
llm = LLM("gpt-4o-mini")
|
||||
|
||||
Reference in New Issue
Block a user