fix: default observation parse fallback to failure and clean up plan-execute types

When _parse_observation_response fails all parse attempts, default to
step_completed_successfully=False instead of True to avoid silently
masking failures. Extract duplicate _extract_task_section into a shared
utility in agent_utils. Type PlanningConfig.llm as str | BaseLLM | None
instead of str | Any | None. Make StepResult a frozen dataclass for
immutability consistency with StepExecutionContext.
This commit is contained in:
Greyson LaLonde
2026-03-15 15:42:22 -04:00
parent 4772a4a0d6
commit 1593336c94
6 changed files with 45 additions and 58 deletions

View File

@@ -1,9 +1,11 @@
from __future__ import annotations
from typing import Any, Literal
from typing import Literal
from pydantic import BaseModel, Field
from crewai.llms.base_llm import BaseLLM
class PlanningConfig(BaseModel):
"""Configuration for agent planning/reasoning before task execution.
@@ -128,7 +130,7 @@ class PlanningConfig(BaseModel):
"whether to continue or replan. None means no per-step timeout."
),
)
llm: str | Any | None = Field(
llm: str | BaseLLM | None = Field(
default=None,
description="LLM to use for planning. Uses agent's LLM if None.",
)

View File

@@ -22,6 +22,7 @@ from crewai.events.types.observation_events import (
StepObservationFailedEvent,
StepObservationStartedEvent,
)
from crewai.utilities.agent_utils import extract_task_section
from crewai.utilities.i18n import I18N, get_i18n
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.planning_types import StepObservation, TodoItem
@@ -192,22 +193,6 @@ class PlannerObserver:
needs_full_replan=False,
)
def _extract_task_section(self, text: str) -> str:
"""Extract the ## Task body from a structured enriched instruction.
Falls back to the full text (capped at 2000 chars) for plain inputs.
"""
for marker in ("\n## Task\n", "\n## Task:", "## Task\n"):
idx = text.find(marker)
if idx >= 0:
start = idx + len(marker)
for end_marker in ("\n---\n", "\n## "):
end = text.find(end_marker, start)
if end > 0:
return text[start:end].strip()
return text[start : start + 2000].strip()
return text[:2000] if len(text) > 2000 else text
def apply_refinements(
self,
observation: StepObservation,
@@ -231,7 +216,9 @@ class PlannerObserver:
todo_by_step: dict[int, TodoItem] = {t.step_number: t for t in remaining_todos}
for refinement in observation.suggested_refinements:
if refinement.step_number in todo_by_step and refinement.new_description:
todo_by_step[refinement.step_number].description = refinement.new_description
todo_by_step[
refinement.step_number
].description = refinement.new_description
return remaining_todos
@@ -256,7 +243,7 @@ class PlannerObserver:
# Standalone kickoff path — no Task object, but we have the raw input.
# Extract just the ## Task section so the observer sees the actual goal,
# not the full enriched instruction with env/tools/verification noise.
task_desc = self._extract_task_section(self.kickoff_input)
task_desc = extract_task_section(self.kickoff_input)
task_goal = "Complete the task successfully"
system_prompt = self._i18n.retrieve("planning", "observation_system_prompt")
@@ -329,7 +316,9 @@ class PlannerObserver:
if text.startswith("```"):
lines = text.split("\n")
# Strip first and last lines (``` markers)
inner = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:])
inner = "\n".join(
lines[1:-1] if lines[-1].strip() == "```" else lines[1:]
)
try:
return StepObservation.model_validate_json(inner.strip())
except Exception: # noqa: S110
@@ -345,12 +334,12 @@ class PlannerObserver:
# Last resort — log what we got so it's diagnosable
logger.warning(
"Could not parse observation response (type=%s). "
"Falling back to default success observation. Preview: %.200s",
"Falling back to default failure observation. Preview: %.200s",
type(response).__name__,
str(response),
)
return StepObservation(
step_completed_successfully=True,
step_completed_successfully=False,
key_information_learned=str(response) if response else "",
remaining_plan_still_valid=True,
remaining_plan_still_valid=False,
)

View File

@@ -32,6 +32,7 @@ from crewai.utilities.agent_utils import (
check_native_tool_support,
enforce_rpm_limit,
execute_single_native_tool_call,
extract_task_section,
format_message_for_llm,
is_tool_call_list,
process_llm_response,
@@ -235,35 +236,6 @@ class StepExecutor:
tools_section=tools_section,
)
def _extract_task_section(self, task_description: str) -> str:
"""Extract the most relevant portion of the task description.
For structured descriptions (e.g. harbor_agent-style with ## Task
and ## Instructions sections), extracts just the task body so the
executor sees the requirements without duplicating tool/verification
instructions that are already in the system prompt.
For plain descriptions, returns the full text (up to 2000 chars).
"""
# Try to extract between "## Task" and the next "---" separator
# or next "##" heading — this isolates the task spec from env/tool noise.
for marker in ("\n## Task\n", "\n## Task:", "## Task\n"):
idx = task_description.find(marker)
if idx >= 0:
start = idx + len(marker)
# End at the first horizontal rule or next top-level ## section
for end_marker in ("\n---\n", "\n## "):
end = task_description.find(end_marker, start)
if end > 0:
return task_description[start:end].strip()
# No end marker — take up to 2000 chars
return task_description[start : start + 2000].strip()
# No structured format — use the full description, reasonably truncated
if len(task_description) > 2000:
return task_description[:2000] + "\n... [truncated]"
return task_description
def _build_user_prompt(self, todo: TodoItem, context: StepExecutionContext) -> str:
"""Build the user prompt for this specific step."""
parts: list[str] = []
@@ -273,7 +245,7 @@ class StepExecutor:
# We extract only the task body (not tool instructions or verification
# sections) to avoid duplicating directives already in the system prompt.
if context.task_description:
task_section = self._extract_task_section(context.task_description)
task_section = extract_task_section(context.task_description)
if task_section:
parts.append(
self._i18n.retrieve(

View File

@@ -214,6 +214,30 @@ def convert_tools_to_openai_schema(
return openai_tools, available_functions, tool_name_mapping
def extract_task_section(text: str) -> str:
"""Extract the ## Task body from a structured enriched instruction.
For structured descriptions (e.g. with ## Task and ## Instructions sections),
extracts just the task body so the caller sees the requirements without
duplicating tool/verification instructions.
Falls back to the full text (up to 2000 chars, with a truncation marker)
for plain inputs.
"""
for marker in ("\n## Task\n", "\n## Task:", "## Task\n"):
idx = text.find(marker)
if idx >= 0:
start = idx + len(marker)
for end_marker in ("\n---\n", "\n## "):
end = text.find(end_marker, start)
if end > 0:
return text[start:end].strip()
return text[start : start + 2000].strip()
if len(text) > 2000:
return text[:2000] + "\n... [truncated]"
return text
def has_reached_max_iterations(iterations: int, max_iterations: int) -> bool:
"""Check if the maximum number of iterations has been reached.

View File

@@ -41,7 +41,7 @@ class StepExecutionContext:
return self.dependency_results.get(step_number)
@dataclass
@dataclass(frozen=True)
class StepResult:
"""Result returned by a StepExecutor after executing a single todo.

View File

@@ -1658,15 +1658,15 @@ class TestObserverResponseParsing:
assert result.replan_reason == "step timed out"
def test_parse_unparseable_falls_back_gracefully(self):
"""Totally unparseable response falls back to default."""
"""Totally unparseable response falls back to default failure."""
from crewai.agents.planner_observer import PlannerObserver
from crewai.utilities.planning_types import StepObservation
result = PlannerObserver._parse_observation_response(12345)
assert isinstance(result, StepObservation)
assert result.step_completed_successfully is True
assert result.remaining_plan_still_valid is True
assert result.step_completed_successfully is False
assert result.remaining_plan_still_valid is False
def test_observe_parses_json_string_from_llm(self):
"""End-to-end: observer.observe() correctly parses a JSON string from llm.call()."""