From 32059c7d79027f2ef34869e8f2b3a3ae45bdd41f Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Tue, 24 Feb 2026 09:03:04 -0800 Subject: [PATCH] refactor: streamline observation and refinement process in PlannerObserver - Updated the PlannerObserver to apply structured refinements directly from observations without requiring a second LLM call. - Renamed method to for clarity. - Enhanced documentation to reflect changes in how refinements are handled. - Removed unnecessary LLM message building and parsing logic, simplifying the refinement process. - Updated event emissions to include summaries of refinements instead of raw data. --- .../src/crewai/agents/planner_observer.py | 124 +++++------------- .../src/crewai/experimental/agent_executor.py | 11 +- lib/crewai/src/crewai/translations/en.json | 2 - .../src/crewai/utilities/planning_types.py | 29 +++- 4 files changed, 63 insertions(+), 103 deletions(-) diff --git a/lib/crewai/src/crewai/agents/planner_observer.py b/lib/crewai/src/crewai/agents/planner_observer.py index 0a2967ba7..4d8fdde2c 100644 --- a/lib/crewai/src/crewai/agents/planner_observer.py +++ b/lib/crewai/src/crewai/agents/planner_observer.py @@ -1,11 +1,14 @@ """PlannerObserver: Observation phase after each step execution. -Implements the "Observe" phase. After every -step execution, the Planner analyzes what happened, what new information was -learned, and whether the remaining plan is still valid. +Implements the "Observe" phase. After every step execution, the Planner +analyzes what happened, what new information was learned, and whether the +remaining plan is still valid. This is NOT an error detector — it runs on every step, including successes, to incorporate runtime observations into the remaining plan. + +Refinements are structured (StepRefinement objects) and applied directly +from the observation result — no second LLM call required. """ from __future__ import annotations @@ -96,11 +99,12 @@ class PlannerObserver: remaining_todos: The pending todos still in the plan. Returns: - StepObservation with the Planner's analysis. + StepObservation with the Planner's analysis. Any suggested + refinements are structured StepRefinement objects ready for + direct application — no second LLM call needed. """ - agent_role = self.agent.role if self.agent else "unknown" + agent_role = self.agent.role - # Emit observation started event crewai_event_bus.emit( self.agent, event=StepObservationStartedEvent( @@ -127,15 +131,21 @@ class PlannerObserver: if isinstance(response, StepObservation): observation = response else: - # If the LLM returned raw text instead of structured output, - # parse it conservatively observation = StepObservation( step_completed_successfully=True, key_information_learned=str(response) if response else "", remaining_plan_still_valid=True, ) - # Emit observation completed event + refinement_summaries = ( + [ + f"Step {r.step_number}: {r.new_description}" + for r in observation.suggested_refinements + ] + if observation.suggested_refinements + else None + ) + crewai_event_bus.emit( self.agent, event=StepObservationCompletedEvent( @@ -148,7 +158,7 @@ class PlannerObserver: needs_full_replan=observation.needs_full_replan, replan_reason=observation.replan_reason, goal_already_achieved=observation.goal_already_achieved, - suggested_refinements=observation.suggested_refinements, + suggested_refinements=refinement_summaries, from_task=self.task, from_agent=self.agent, ), @@ -159,7 +169,6 @@ class PlannerObserver: except Exception as e: logger.warning(f"Observation LLM call failed: {e}. Defaulting to continue.") - # Emit observation failed event crewai_event_bus.emit( self.agent, event=StepObservationFailedEvent( @@ -178,47 +187,30 @@ class PlannerObserver: remaining_plan_still_valid=True, ) - def refine_todos( + def apply_refinements( self, observation: StepObservation, remaining_todos: list[TodoItem], ) -> list[TodoItem]: - """Refine pending todo descriptions based on observation. + """Apply structured refinements from the observation directly to todo descriptions. - This is a LIGHTWEIGHT operation — no full replan. It updates the - description field of pending todos based on new information learned. - - Example: Step 1 found "3 products: A, B, C" → Step 2 changes from - "Select the best product" to "Select product B (highest rated)" + No LLM call needed — refinements are already structured StepRefinement + objects produced by the observation call. This is a pure in-memory update. Args: - observation: The observation with suggested refinements. - remaining_todos: The pending todos to refine. + observation: The observation containing structured refinements. + remaining_todos: The pending todos to update in-place. Returns: - The refined todo list (same objects, updated descriptions). + The same todo list with updated descriptions where refinements applied. """ if not observation.suggested_refinements: return remaining_todos - # Ask the LLM to apply the refinements to the todo descriptions - messages = self._build_refinement_messages(observation, remaining_todos) - - try: - response = self.llm.call( - messages, - from_task=self.task, - from_agent=self.agent, - ) - - if response: - # Parse the LLM's refined descriptions and apply them - self._apply_refinements(str(response), remaining_todos) - - except Exception as e: - logger.warning( - f"Refinement LLM call failed: {e}. Keeping original descriptions." - ) + todo_by_step: dict[int, TodoItem] = {t.step_number: t for t in remaining_todos} + for refinement in observation.suggested_refinements: + if refinement.step_number in todo_by_step and refinement.new_description: + todo_by_step[refinement.step_number].description = refinement.new_description return remaining_todos @@ -282,57 +274,3 @@ class PlannerObserver: {"role": "user", "content": user_prompt}, ] - def _build_refinement_messages( - self, - observation: StepObservation, - remaining_todos: list[TodoItem], - ) -> list[LLMMessage]: - """Build messages for the refinement LLM call.""" - system_prompt = self._i18n.retrieve("planning", "refinement_system_prompt") - - refinements = "\n".join(observation.suggested_refinements or []) - todo_lines = "\n".join( - f"Step {t.step_number}: {t.description}" for t in remaining_todos - ) - - user_prompt = self._i18n.retrieve("planning", "refinement_user_prompt").format( - key_information_learned=observation.key_information_learned, - refinements=refinements, - todo_lines=todo_lines, - ) - - return [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ] - - def _apply_refinements( - self, - llm_response: str, - remaining_todos: list[TodoItem], - ) -> None: - """Parse LLM refinement response and update todo descriptions. - - Expects format: "Step N: " per line. - """ - # Build lookup for quick access - todo_by_step: dict[int, TodoItem] = {t.step_number: t for t in remaining_todos} - - for line in llm_response.strip().split("\n"): - line = line.strip() - if not line.startswith("Step "): - continue - - # Parse "Step N: description" - try: - parts = line.split(":", 1) - if len(parts) < 2: - continue - step_part = parts[0].strip() # "Step N" - description = parts[1].strip() - step_num = int(step_part.replace("Step", "").strip()) - - if step_num in todo_by_step and description: - todo_by_step[step_num].description = description - except (ValueError, IndexError): - continue diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index 5b994bddc..6351bedbd 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -721,9 +721,14 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): observer = self._ensure_planner_observer() remaining = self.state.todos.get_pending_todos() - observer.refine_todos(recent_observation, remaining) + observer.apply_refinements(recent_observation, remaining) + + + refinement_summaries = [ + f"Step {r.step_number}: {r.new_description}" + for r in recent_observation.suggested_refinements + ] - # Emit refinement event crewai_event_bus.emit( self.agent, event=PlanRefinementEvent( @@ -731,7 +736,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): step_number=last_step, step_description="", refined_step_count=len(remaining), - refinements=recent_observation.suggested_refinements, + refinements=refinement_summaries, from_task=self.task, from_agent=self.agent, ), diff --git a/lib/crewai/src/crewai/translations/en.json b/lib/crewai/src/crewai/translations/en.json index 997bde5fa..2e6d30f9d 100644 --- a/lib/crewai/src/crewai/translations/en.json +++ b/lib/crewai/src/crewai/translations/en.json @@ -85,8 +85,6 @@ "refine_plan_prompt": "Your previous plan:\n{current_plan}\n\nYou indicated you weren't ready. Refine your plan to address the specific gap.\n\nKeep the plan minimal - only add steps that directly address the issue.\n\nConclude with READY or NOT READY as before.", "observation_system_prompt": "You are a Planning Agent observing execution progress. After each step completes, you analyze what happened and decide whether the remaining plan is still valid.\n\nReason step-by-step about:\n1. What new information was learned from this step's result\n2. Whether the remaining steps still make sense given this new information\n3. What refinements, if any, are needed for upcoming steps\n4. Whether the overall goal has already been achieved\n\nBe conservative about triggering full replans — only do so when the remaining plan is fundamentally wrong, not just suboptimal.", "observation_user_prompt": "## Original task\n{task_description}\n\n## Expected output\n{task_goal}\n{completed_summary}\n\n## Just completed step {step_number}\nDescription: {step_description}\nResult: {step_result}\n{remaining_summary}\n\nAnalyze this step's result and provide your observation.", - "refinement_system_prompt": "You are refining upcoming plan steps based on new information. Update the step descriptions to be more specific and actionable given what was learned. Keep the same step numbers.\n\nRespond with one line per step in the format:\nStep N: ", - "refinement_user_prompt": "## New information learned\n{key_information_learned}\n\n## Suggested refinements\n{refinements}\n\n## Current pending steps\n{todo_lines}\n\nUpdate the step descriptions to incorporate the new information.", "step_executor_system_prompt": "You are {role}. {backstory}\n\nYour goal: {goal}\n\nYou are executing a specific step in a multi-step plan. Focus ONLY on completing the current step. Do not plan ahead or worry about future steps.\n\nBefore acting, briefly reason about what you need to do and which approach or tool would be most helpful for this specific step.{tools_section}", "step_executor_tools_section": "\n\nAvailable tools: {tool_names}\n\nTo use a tool, respond with:\nThought: \nAction: \nAction Input: \n\nWhen you have the final answer, respond with:\nThought: \nFinal Answer: ", "step_executor_user_prompt": "## Current Step\n{step_description}", diff --git a/lib/crewai/src/crewai/utilities/planning_types.py b/lib/crewai/src/crewai/utilities/planning_types.py index 306b3b4fc..57df36fbd 100644 --- a/lib/crewai/src/crewai/utilities/planning_types.py +++ b/lib/crewai/src/crewai/utilities/planning_types.py @@ -174,6 +174,20 @@ class TodoList(BaseModel): self.items = non_pending + new_items +class StepRefinement(BaseModel): + """A structured in-place update for a single pending step. + + Returned as part of StepObservation when the Planner learns new + information that makes a pending step description more specific. + Applied directly — no second LLM call required. + """ + + step_number: int = Field(description="The step number to update (1-based)") + new_description: str = Field( + description="The updated, more specific description for this step" + ) + + class StepObservation(BaseModel): """Planner's observation after a step execution completes. @@ -189,9 +203,10 @@ class StepObservation(BaseModel): (e.g., "Found 3 products: A, B, C"). Used to refine upcoming steps. remaining_plan_still_valid: Whether pending todos still make sense given the new information. True does NOT mean no refinement needed. - suggested_refinements: Minor tweaks to upcoming step descriptions. - These are lightweight in-place updates, not a full replan. - Example: ["Step 3 should select product B instead of 'best product'"] + suggested_refinements: Structured in-place updates to pending step + descriptions. Each entry targets a specific step by number. These + are applied directly without a second LLM call. + Example: [{"step_number": 3, "new_description": "Select product B (highest rated)"}] needs_full_replan: The remaining plan is fundamentally wrong and must be regenerated from scratch. Mutually exclusive with remaining_plan_still_valid (if this is True, that should be False). @@ -211,9 +226,13 @@ class StepObservation(BaseModel): default=True, description="Whether the remaining pending todos still make sense given new information", ) - suggested_refinements: list[str] | None = Field( + suggested_refinements: list[StepRefinement] | None = Field( default=None, - description="Minor tweaks to descriptions of upcoming steps (lightweight, no full replan)", + description=( + "Structured updates to pending step descriptions based on new information. " + "Each entry specifies a step_number and new_description. " + "Applied directly — no separate replan needed." + ), ) needs_full_replan: bool = Field( default=False,