refactor: streamline observation and refinement process in PlannerObserver

- Updated the PlannerObserver to apply structured refinements directly from observations without requiring a second LLM call.
- Renamed  method to  for clarity.
- Enhanced documentation to reflect changes in how refinements are handled.
- Removed unnecessary LLM message building and parsing logic, simplifying the refinement process.
- Updated event emissions to include summaries of refinements instead of raw data.
This commit is contained in:
lorenzejay
2026-02-24 09:03:04 -08:00
parent 8194bb42f1
commit 32059c7d79
4 changed files with 63 additions and 103 deletions

View File

@@ -1,11 +1,14 @@
"""PlannerObserver: Observation phase after each step execution.
Implements the "Observe" phase. After every
step execution, the Planner analyzes what happened, what new information was
learned, and whether the remaining plan is still valid.
Implements the "Observe" phase. After every step execution, the Planner
analyzes what happened, what new information was learned, and whether the
remaining plan is still valid.
This is NOT an error detector — it runs on every step, including successes,
to incorporate runtime observations into the remaining plan.
Refinements are structured (StepRefinement objects) and applied directly
from the observation result — no second LLM call required.
"""
from __future__ import annotations
@@ -96,11 +99,12 @@ class PlannerObserver:
remaining_todos: The pending todos still in the plan.
Returns:
StepObservation with the Planner's analysis.
StepObservation with the Planner's analysis. Any suggested
refinements are structured StepRefinement objects ready for
direct application — no second LLM call needed.
"""
agent_role = self.agent.role if self.agent else "unknown"
agent_role = self.agent.role
# Emit observation started event
crewai_event_bus.emit(
self.agent,
event=StepObservationStartedEvent(
@@ -127,15 +131,21 @@ class PlannerObserver:
if isinstance(response, StepObservation):
observation = response
else:
# If the LLM returned raw text instead of structured output,
# parse it conservatively
observation = StepObservation(
step_completed_successfully=True,
key_information_learned=str(response) if response else "",
remaining_plan_still_valid=True,
)
# Emit observation completed event
refinement_summaries = (
[
f"Step {r.step_number}: {r.new_description}"
for r in observation.suggested_refinements
]
if observation.suggested_refinements
else None
)
crewai_event_bus.emit(
self.agent,
event=StepObservationCompletedEvent(
@@ -148,7 +158,7 @@ class PlannerObserver:
needs_full_replan=observation.needs_full_replan,
replan_reason=observation.replan_reason,
goal_already_achieved=observation.goal_already_achieved,
suggested_refinements=observation.suggested_refinements,
suggested_refinements=refinement_summaries,
from_task=self.task,
from_agent=self.agent,
),
@@ -159,7 +169,6 @@ class PlannerObserver:
except Exception as e:
logger.warning(f"Observation LLM call failed: {e}. Defaulting to continue.")
# Emit observation failed event
crewai_event_bus.emit(
self.agent,
event=StepObservationFailedEvent(
@@ -178,47 +187,30 @@ class PlannerObserver:
remaining_plan_still_valid=True,
)
def refine_todos(
def apply_refinements(
self,
observation: StepObservation,
remaining_todos: list[TodoItem],
) -> list[TodoItem]:
"""Refine pending todo descriptions based on observation.
"""Apply structured refinements from the observation directly to todo descriptions.
This is a LIGHTWEIGHT operation — no full replan. It updates the
description field of pending todos based on new information learned.
Example: Step 1 found "3 products: A, B, C" → Step 2 changes from
"Select the best product" to "Select product B (highest rated)"
No LLM call needed — refinements are already structured StepRefinement
objects produced by the observation call. This is a pure in-memory update.
Args:
observation: The observation with suggested refinements.
remaining_todos: The pending todos to refine.
observation: The observation containing structured refinements.
remaining_todos: The pending todos to update in-place.
Returns:
The refined todo list (same objects, updated descriptions).
The same todo list with updated descriptions where refinements applied.
"""
if not observation.suggested_refinements:
return remaining_todos
# Ask the LLM to apply the refinements to the todo descriptions
messages = self._build_refinement_messages(observation, remaining_todos)
try:
response = self.llm.call(
messages,
from_task=self.task,
from_agent=self.agent,
)
if response:
# Parse the LLM's refined descriptions and apply them
self._apply_refinements(str(response), remaining_todos)
except Exception as e:
logger.warning(
f"Refinement LLM call failed: {e}. Keeping original descriptions."
)
todo_by_step: dict[int, TodoItem] = {t.step_number: t for t in remaining_todos}
for refinement in observation.suggested_refinements:
if refinement.step_number in todo_by_step and refinement.new_description:
todo_by_step[refinement.step_number].description = refinement.new_description
return remaining_todos
@@ -282,57 +274,3 @@ class PlannerObserver:
{"role": "user", "content": user_prompt},
]
def _build_refinement_messages(
self,
observation: StepObservation,
remaining_todos: list[TodoItem],
) -> list[LLMMessage]:
"""Build messages for the refinement LLM call."""
system_prompt = self._i18n.retrieve("planning", "refinement_system_prompt")
refinements = "\n".join(observation.suggested_refinements or [])
todo_lines = "\n".join(
f"Step {t.step_number}: {t.description}" for t in remaining_todos
)
user_prompt = self._i18n.retrieve("planning", "refinement_user_prompt").format(
key_information_learned=observation.key_information_learned,
refinements=refinements,
todo_lines=todo_lines,
)
return [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
def _apply_refinements(
self,
llm_response: str,
remaining_todos: list[TodoItem],
) -> None:
"""Parse LLM refinement response and update todo descriptions.
Expects format: "Step N: <description>" per line.
"""
# Build lookup for quick access
todo_by_step: dict[int, TodoItem] = {t.step_number: t for t in remaining_todos}
for line in llm_response.strip().split("\n"):
line = line.strip()
if not line.startswith("Step "):
continue
# Parse "Step N: description"
try:
parts = line.split(":", 1)
if len(parts) < 2:
continue
step_part = parts[0].strip() # "Step N"
description = parts[1].strip()
step_num = int(step_part.replace("Step", "").strip())
if step_num in todo_by_step and description:
todo_by_step[step_num].description = description
except (ValueError, IndexError):
continue

View File

@@ -721,9 +721,14 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
observer = self._ensure_planner_observer()
remaining = self.state.todos.get_pending_todos()
observer.refine_todos(recent_observation, remaining)
observer.apply_refinements(recent_observation, remaining)
refinement_summaries = [
f"Step {r.step_number}: {r.new_description}"
for r in recent_observation.suggested_refinements
]
# Emit refinement event
crewai_event_bus.emit(
self.agent,
event=PlanRefinementEvent(
@@ -731,7 +736,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
step_number=last_step,
step_description="",
refined_step_count=len(remaining),
refinements=recent_observation.suggested_refinements,
refinements=refinement_summaries,
from_task=self.task,
from_agent=self.agent,
),

View File

@@ -85,8 +85,6 @@
"refine_plan_prompt": "Your previous plan:\n{current_plan}\n\nYou indicated you weren't ready. Refine your plan to address the specific gap.\n\nKeep the plan minimal - only add steps that directly address the issue.\n\nConclude with READY or NOT READY as before.",
"observation_system_prompt": "You are a Planning Agent observing execution progress. After each step completes, you analyze what happened and decide whether the remaining plan is still valid.\n\nReason step-by-step about:\n1. What new information was learned from this step's result\n2. Whether the remaining steps still make sense given this new information\n3. What refinements, if any, are needed for upcoming steps\n4. Whether the overall goal has already been achieved\n\nBe conservative about triggering full replans — only do so when the remaining plan is fundamentally wrong, not just suboptimal.",
"observation_user_prompt": "## Original task\n{task_description}\n\n## Expected output\n{task_goal}\n{completed_summary}\n\n## Just completed step {step_number}\nDescription: {step_description}\nResult: {step_result}\n{remaining_summary}\n\nAnalyze this step's result and provide your observation.",
"refinement_system_prompt": "You are refining upcoming plan steps based on new information. Update the step descriptions to be more specific and actionable given what was learned. Keep the same step numbers.\n\nRespond with one line per step in the format:\nStep N: <refined description>",
"refinement_user_prompt": "## New information learned\n{key_information_learned}\n\n## Suggested refinements\n{refinements}\n\n## Current pending steps\n{todo_lines}\n\nUpdate the step descriptions to incorporate the new information.",
"step_executor_system_prompt": "You are {role}. {backstory}\n\nYour goal: {goal}\n\nYou are executing a specific step in a multi-step plan. Focus ONLY on completing the current step. Do not plan ahead or worry about future steps.\n\nBefore acting, briefly reason about what you need to do and which approach or tool would be most helpful for this specific step.{tools_section}",
"step_executor_tools_section": "\n\nAvailable tools: {tool_names}\n\nTo use a tool, respond with:\nThought: <your reasoning>\nAction: <tool_name>\nAction Input: <input>\n\nWhen you have the final answer, respond with:\nThought: <your reasoning>\nFinal Answer: <your answer>",
"step_executor_user_prompt": "## Current Step\n{step_description}",

View File

@@ -174,6 +174,20 @@ class TodoList(BaseModel):
self.items = non_pending + new_items
class StepRefinement(BaseModel):
"""A structured in-place update for a single pending step.
Returned as part of StepObservation when the Planner learns new
information that makes a pending step description more specific.
Applied directly — no second LLM call required.
"""
step_number: int = Field(description="The step number to update (1-based)")
new_description: str = Field(
description="The updated, more specific description for this step"
)
class StepObservation(BaseModel):
"""Planner's observation after a step execution completes.
@@ -189,9 +203,10 @@ class StepObservation(BaseModel):
(e.g., "Found 3 products: A, B, C"). Used to refine upcoming steps.
remaining_plan_still_valid: Whether pending todos still make sense
given the new information. True does NOT mean no refinement needed.
suggested_refinements: Minor tweaks to upcoming step descriptions.
These are lightweight in-place updates, not a full replan.
Example: ["Step 3 should select product B instead of 'best product'"]
suggested_refinements: Structured in-place updates to pending step
descriptions. Each entry targets a specific step by number. These
are applied directly without a second LLM call.
Example: [{"step_number": 3, "new_description": "Select product B (highest rated)"}]
needs_full_replan: The remaining plan is fundamentally wrong and must
be regenerated from scratch. Mutually exclusive with
remaining_plan_still_valid (if this is True, that should be False).
@@ -211,9 +226,13 @@ class StepObservation(BaseModel):
default=True,
description="Whether the remaining pending todos still make sense given new information",
)
suggested_refinements: list[str] | None = Field(
suggested_refinements: list[StepRefinement] | None = Field(
default=None,
description="Minor tweaks to descriptions of upcoming steps (lightweight, no full replan)",
description=(
"Structured updates to pending step descriptions based on new information. "
"Each entry specifies a step_number and new_description. "
"Applied directly — no separate replan needed."
),
)
needs_full_replan: bool = Field(
default=False,