refactor: enhance planning and execution flow in agents

- Updated the PlannerObserver to accept a kickoff input for standalone task execution, improving flexibility in task handling.
- Refined the step execution process in StepExecutor to support multi-turn action loops, allowing for iterative tool execution and observation.
- Introduced a method to extract relevant task sections from descriptions, ensuring clarity in task requirements.
- Enhanced the AgentExecutor to manage step failures more effectively, triggering replans only when necessary and preserving completed task history.
- Updated translations to reflect changes in planning principles and execution prompts, emphasizing concrete and executable steps.
This commit is contained in:
lorenzejay
2026-03-03 10:17:35 -08:00
parent 76f329a025
commit 87e1852746
5 changed files with 331 additions and 79 deletions

View File

@@ -53,9 +53,15 @@ class PlannerObserver:
task: Optional task context (for description and expected output).
"""
def __init__(self, agent: Agent, task: Task | None = None) -> None:
def __init__(
self,
agent: Agent,
task: Task | None = None,
kickoff_input: str = "",
) -> None:
self.agent = agent
self.task = task
self.kickoff_input = kickoff_input
self.llm = self._resolve_llm()
self._i18n: I18N = get_i18n()
@@ -183,14 +189,32 @@ class PlannerObserver:
),
)
# Don't force a full replan — the step may have succeeded even if the
# observer LLM failed to parse the result. Defaulting to "continue" is
# far less disruptive than wiping the entire plan on every observer error.
return StepObservation(
step_completed_successfully=False,
step_completed_successfully=True,
key_information_learned="",
remaining_plan_still_valid=False,
needs_full_replan=True,
replan_reason="Observer failed to evaluate step result safely",
remaining_plan_still_valid=True,
needs_full_replan=False,
)
def _extract_task_section(self, text: str) -> str:
"""Extract the ## Task body from a structured enriched instruction.
Falls back to the full text (capped at 2000 chars) for plain inputs.
"""
for marker in ("\n## Task\n", "\n## Task:", "## Task\n"):
idx = text.find(marker)
if idx >= 0:
start = idx + len(marker)
for end_marker in ("\n---\n", "\n## "):
end = text.find(end_marker, start)
if end > 0:
return text[start:end].strip()
return text[start : start + 2000].strip()
return text[:2000] if len(text) > 2000 else text
def apply_refinements(
self,
observation: StepObservation,
@@ -235,6 +259,12 @@ class PlannerObserver:
if self.task:
task_desc = self.task.description or ""
task_goal = self.task.expected_output or ""
elif self.kickoff_input:
# Standalone kickoff path — no Task object, but we have the raw input.
# Extract just the ## Task section so the observer sees the actual goal,
# not the full enriched instruction with env/tools/verification noise.
task_desc = self._extract_task_section(self.kickoff_input)
task_goal = "Complete the task successfully"
system_prompt = self._i18n.retrieve("planning", "observation_system_prompt")

View File

@@ -124,11 +124,12 @@ class StepExecutor:
# ------------------------------------------------------------------
def execute(self, todo: TodoItem, context: StepExecutionContext) -> StepResult:
"""Execute a single todo item using direct-action execution.
"""Execute a single todo item using a multi-turn action loop.
Enforces the RPM limit, builds a fresh message list, makes one LLM
call, executes any tool returned, and returns the result. Never
touches external state.
Enforces the RPM limit, builds a fresh message list, then iterates
LLM call → tool execution → observation until the LLM signals it is
done (text answer) or max_step_iterations is reached. Never touches
external AgentExecutor state.
Args:
todo: The todo item to execute.
@@ -207,10 +208,52 @@ class StepExecutor:
tools_section=tools_section,
)
def _extract_task_section(self, task_description: str) -> str:
"""Extract the most relevant portion of the task description.
For structured descriptions (e.g. harbor_agent-style with ## Task
and ## Instructions sections), extracts just the task body so the
executor sees the requirements without duplicating tool/verification
instructions that are already in the system prompt.
For plain descriptions, returns the full text (up to 2000 chars).
"""
# Try to extract between "## Task" and the next "---" separator
# or next "##" heading — this isolates the task spec from env/tool noise.
for marker in ("\n## Task\n", "\n## Task:", "## Task\n"):
idx = task_description.find(marker)
if idx >= 0:
start = idx + len(marker)
# End at the first horizontal rule or next top-level ## section
for end_marker in ("\n---\n", "\n## "):
end = task_description.find(end_marker, start)
if end > 0:
return task_description[start:end].strip()
# No end marker — take up to 2000 chars
return task_description[start : start + 2000].strip()
# No structured format — use the full description, reasonably truncated
if len(task_description) > 2000:
return task_description[:2000] + "\n... [truncated]"
return task_description
def _build_user_prompt(self, todo: TodoItem, context: StepExecutionContext) -> str:
"""Build the user prompt for this specific step."""
parts: list[str] = []
# Include overall task context so the executor knows the full goal and
# required output format/location — critical for knowing WHAT to produce.
# We extract only the task body (not tool instructions or verification
# sections) to avoid duplicating directives already in the system prompt.
if context.task_description:
task_section = self._extract_task_section(context.task_description)
if task_section:
parts.append(
self._i18n.retrieve("planning", "step_executor_task_context").format(
task_context=task_section,
)
)
parts.append(
self._i18n.retrieve("planning", "step_executor_user_prompt").format(
step_description=todo.description,
@@ -241,43 +284,58 @@ class StepExecutor:
return "\n".join(parts)
# ------------------------------------------------------------------
# Internal: Direct-action execution (single LLM call)
# Internal: Multi-turn execution loop
# ------------------------------------------------------------------
def _execute_text_parsed(
self,
messages: list[LLMMessage],
tool_calls_made: list[str],
max_step_iterations: int = 15,
) -> str:
"""Execute step using text-parsed tool calling (single LLM call).
"""Execute step using text-parsed tool calling with a multi-turn loop.
Calls the LLM once. If the response is a tool call, executes the tool
and returns its result. If a final answer, returns it directly.
No retry loop — the PlannerObserver handles recovery.
Iterates LLM call → tool execution → observation until the LLM
produces a Final Answer or max_step_iterations is reached.
This allows the agent to: run a command, see the output, adjust its
approach, and run another command — all within a single plan step.
"""
answer = self.llm.call(
messages,
callbacks=self.callbacks,
from_task=self.task,
from_agent=self.agent,
)
if not answer:
raise ValueError("Empty response from LLM")
answer_str = str(answer)
use_stop_words = self.llm.supports_stop_words() if self.llm else False
formatted = process_llm_response(answer_str, use_stop_words)
last_tool_result = ""
if isinstance(formatted, AgentFinish):
return str(formatted.output)
for _ in range(max_step_iterations):
answer = self.llm.call(
messages,
callbacks=self.callbacks,
from_task=self.task,
from_agent=self.agent,
)
if isinstance(formatted, AgentAction):
tool_calls_made.append(formatted.tool)
return self._execute_text_tool_with_events(formatted)
if not answer:
raise ValueError("Empty response from LLM")
# Raw text response — treat as the step result
return answer_str
answer_str = str(answer)
formatted = process_llm_response(answer_str, use_stop_words)
if isinstance(formatted, AgentFinish):
return str(formatted.output)
if isinstance(formatted, AgentAction):
tool_calls_made.append(formatted.tool)
tool_result = self._execute_text_tool_with_events(formatted)
last_tool_result = tool_result
# Append the assistant's reasoning + action, then the observation.
# _build_observation_message handles vision sentinels so the LLM
# receives an image content block instead of raw base64 text.
messages.append({"role": "assistant", "content": answer_str})
messages.append(self._build_observation_message(tool_result))
continue
# Raw text response with no Final Answer marker — treat as done
return answer_str
# Max iterations reached — return the last tool result we accumulated
return last_tool_result
def _execute_text_tool_with_events(self, formatted: AgentAction) -> str:
"""Execute text-parsed tool calls with tool usage events."""
@@ -365,6 +423,52 @@ class StepExecutor:
return {"input": stripped_input}
return {"input": str(tool_input)}
# ------------------------------------------------------------------
# Internal: Vision support
# ------------------------------------------------------------------
@staticmethod
def _parse_vision_sentinel(raw: str) -> tuple[str, str] | None:
"""Parse a VISION_IMAGE sentinel into (media_type, base64_data), or None."""
_PREFIX = "VISION_IMAGE:"
if not raw.startswith(_PREFIX):
return None
rest = raw[len(_PREFIX):]
sep = rest.find(":")
if sep <= 0:
return None
return rest[:sep], rest[sep + 1:]
@staticmethod
def _build_observation_message(tool_result: str) -> LLMMessage:
"""Build an observation message, converting vision sentinels to image blocks.
When a tool returns a VISION_IMAGE sentinel (e.g. from read_image),
we build a multimodal content block so the LLM can actually *see*
the image rather than receiving a wall of base64 text.
Uses the standard image_url / data-URI format so each LLM provider's
SDK (OpenAI, LiteLLM, etc.) handles the provider-specific conversion.
Format: ``VISION_IMAGE:<media_type>:<base64_data>``
"""
parsed = StepExecutor._parse_vision_sentinel(tool_result)
if parsed:
media_type, b64_data = parsed
return {
"role": "user",
"content": [
{"type": "text", "text": "Observation: Here is the image:"},
{
"type": "image_url",
"image_url": {
"url": f"data:{media_type};base64,{b64_data}",
},
},
],
}
return {"role": "user", "content": f"Observation: {tool_result}"}
def _validate_expected_tool_usage(
self,
todo: TodoItem,
@@ -393,31 +497,47 @@ class StepExecutor:
self,
messages: list[LLMMessage],
tool_calls_made: list[str],
max_step_iterations: int = 15,
) -> str:
"""Execute step using native function calling (single LLM call).
"""Execute step using native function calling with a multi-turn loop.
Calls the LLM once with the tool schema. If tool calls are returned,
executes them and returns their results. If a text answer, returns it.
No retry loop — the PlannerObserver handles recovery.
Iterates LLM call → tool execution → appended results until the LLM
returns a text answer (no more tool calls) or max_step_iterations is
reached. This lets the agent run a shell command, observe the output,
correct mistakes, and issue follow-up commands — all within one step.
"""
answer = self.llm.call(
messages,
tools=self._openai_tools,
callbacks=self.callbacks,
from_task=self.task,
from_agent=self.agent,
)
accumulated_results: list[str] = []
if not answer:
raise ValueError("Empty response from LLM")
for _ in range(max_step_iterations):
answer = self.llm.call(
messages,
tools=self._openai_tools,
callbacks=self.callbacks,
from_task=self.task,
from_agent=self.agent,
)
if isinstance(answer, list) and answer and is_tool_call_list(answer):
return self._execute_native_tool_calls(answer, messages, tool_calls_made)
if not answer:
raise ValueError("Empty response from LLM")
if isinstance(answer, BaseModel):
return answer.model_dump_json()
if isinstance(answer, BaseModel):
return answer.model_dump_json()
return str(answer)
if isinstance(answer, list) and answer and is_tool_call_list(answer):
# _execute_native_tool_calls appends assistant + tool messages
# to `messages` as a side-effect, so the next LLM call will
# see the full conversation history including tool outputs.
result = self._execute_native_tool_calls(
answer, messages, tool_calls_made
)
accumulated_results.append(result)
continue
# Text answer → LLM decided the step is done
return str(answer)
# Max iterations reached — return everything we accumulated
return "\n".join(filter(None, accumulated_results))
def _execute_native_tool_calls(
self,
@@ -457,9 +577,32 @@ class StepExecutor:
return str(call_result.result)
if call_result.tool_message:
messages.append(call_result.tool_message)
content = call_result.tool_message.get("content", "")
if content:
tool_results.append(str(content))
raw_content = call_result.tool_message.get("content", "")
if isinstance(raw_content, str):
parsed = self._parse_vision_sentinel(raw_content)
if parsed:
media_type, b64_data = parsed
# Replace the sentinel with a standard image_url content block.
# Each provider SDK (LiteLLM → Anthropic, OpenAI native, etc.)
# converts the data-URI to its own wire format.
modified = dict(call_result.tool_message)
modified["content"] = [
{
"type": "image_url",
"image_url": {
"url": f"data:{media_type};base64,{b64_data}",
},
}
]
messages.append(modified)
tool_results.append("[image]")
else:
messages.append(call_result.tool_message)
if raw_content:
tool_results.append(raw_content)
else:
messages.append(call_result.tool_message)
if raw_content:
tool_results.append(str(raw_content))
return "\n".join(tool_results) if tool_results else ""

View File

@@ -429,6 +429,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self._planner_observer = PlannerObserver(
agent=self.agent,
task=self.task,
kickoff_input=getattr(self, "_kickoff_input", ""),
)
return self._planner_observer
@@ -437,12 +438,14 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
Returns:
The reasoning effort level: "low", "medium", or "high".
Defaults to "low" if no planning config is set.
Defaults to "medium" if no planning config is set so that
step failures reliably trigger replanning rather than being
silently ignored.
"""
config = getattr(self.agent, "planning_config", None)
if config is not None and hasattr(config, "reasoning_effort"):
return config.reasoning_effort
return "low"
return "medium"
def _build_context_for_todo(self, todo: TodoItem) -> StepExecutionContext:
"""Build an isolated execution context for a single todo.
@@ -554,16 +557,43 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
@router("step_observed_low")
def handle_step_observed_low(
self,
) -> Literal["continue_plan"]:
) -> Literal["continue_plan", "replan_now"]:
"""Low reasoning effort: mark step complete and continue linearly.
Skips the entire decide/replan/refine pipeline. The observation
still validates success, but execution proceeds regardless.
Skips the refine/goal-achieved pipeline but still gates on hard
failures: if the observer says the step failed AND a full replan is
needed, we route to ``replan_now`` rather than blindly continuing.
This prevents cascading failures where every subsequent step builds
on a broken foundation.
"""
current_todo = self.state.todos.current_todo
if not current_todo:
return "continue_plan"
observation = self.state.observations.get(current_todo.step_number)
# Even at low effort, don't ignore a hard step failure.
# A hard failure is one where the step did not succeed AND a replan
# is explicitly required (e.g. required tool not found, permission
# denied, environment misconfiguration).
if (
observation
and not observation.step_completed_successfully
and observation.needs_full_replan
):
if self.agent.verbose:
self._printer.print(
content=(
f"[Low] Step {current_todo.step_number} hard-failed "
f"— triggering replan: {observation.replan_reason}"
),
color="yellow",
)
self.state.last_replan_reason = (
observation.replan_reason or "Step did not complete successfully"
)
return "replan_now"
self.state.todos.mark_completed(
current_todo.step_number, result=current_todo.result
)
@@ -610,14 +640,37 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
)
return "continue_plan"
# Step failed — trigger replan
# Step failed — only replan if observer explicitly requires it,
# otherwise mark done and continue (same gate as low-effort).
if observation.needs_full_replan:
if self.agent.verbose:
self._printer.print(
content=(
f"[Medium] Step {current_todo.step_number} failed + replan required "
f"— triggering replan: {observation.replan_reason}"
),
color="yellow",
)
self.state.last_replan_reason = (
observation.replan_reason or "Step did not complete successfully"
)
return "replan_now"
# Step failed but observer does not require a full replan — continue
self.state.todos.mark_completed(
current_todo.step_number, result=current_todo.result
)
if self.agent.verbose:
completed = self.state.todos.completed_count
total = len(self.state.todos.items)
self._printer.print(
content=f"[Medium] Step {current_todo.step_number} failed — triggering replan",
content=(
f"[Medium] Step {current_todo.step_number} failed but no replan needed "
f"({completed}/{total}) — continuing"
),
color="yellow",
)
self.state.last_replan_reason = "Step did not complete successfully"
return "replan_now"
return "continue_plan"
# -- High effort: full observation pipeline (existing behavior) --
@@ -793,9 +846,8 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
Preserves completed todo results and replaces only pending steps.
"""
max_replans = 3
self.state.replan_count += 1
if self.state.replan_count > max_replans:
if self.state.replan_count >= max_replans:
if self.agent.verbose:
self._printer.print(
content=f"Max replans ({max_replans}) reached — finalizing with current results",
@@ -803,6 +855,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
)
return "all_todos_complete"
self.state.replan_count += 1
reason = self.state.last_replan_reason or "Dynamic replan triggered"
completed = self.state.todos.get_completed_todos()
@@ -2405,19 +2458,36 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
output = planning_handler.handle_agent_reasoning()
self.task.description = original_description
else:
planning_handler.description = enhanced_description
# description is a read-only property — recreate with enhanced text
input_text = getattr(self, "_kickoff_input", "")
planning_handler = AgentReasoning(
agent=self.agent,
description=enhanced_description or input_text or "Complete the requested task",
expected_output="Complete the task successfully",
)
output = planning_handler.handle_agent_reasoning()
# Reset todos with new plan
# Update plan metadata and replace only pending todos,
# preserving completed history for context and synthesis.
self.state.plan = output.plan.plan
self.state.plan_ready = output.plan.ready
if self.state.plan_ready and output.plan.steps:
self._create_todos_from_plan(output.plan.steps)
new_todos = [
TodoItem(
step_number=step.step_number,
description=step.description,
tool_to_use=step.tool_to_use,
depends_on=step.depends_on,
status="pending",
)
for step in output.plan.steps
]
self.state.todos.replace_pending_todos(new_todos)
if self.agent.verbose:
self._printer.print(
content=f"New plan created with {len(output.plan.steps)} steps",
content=f"Replan: {len(new_todos)} new steps (completed history preserved)",
color="green",
)

View File

@@ -80,21 +80,22 @@
"refine_plan_prompt": "Your plan:\n{current_plan}\n\nYou indicated you're not ready. Address the specific gap while keeping the plan minimal.\n\nConclude with READY or NOT READY."
},
"planning": {
"system_prompt": "You are a strategic planning assistant. Create minimal, effective execution plans. Prefer fewer steps over more.",
"create_plan_prompt": "Create a focused execution plan for the following task:\n\n## Task\n{description}\n\n## Expected Output\n{expected_output}\n\n## Available Tools\n{tools}\n\n## Planning Principles\nFocus on WHAT needs to be accomplished, not HOW. Group related actions into logical units. Fewer steps = better. Most tasks need 3-6 steps. Hard limit: {max_steps} steps.\n\n## Step Types (only these are valid):\n1. **Tool Step**: Uses a tool to gather information or take action\n2. **Output Step**: Synthesizes prior results into the final deliverable (usually the last step)\n\n## Rules:\n- Each step must either USE A TOOL or PRODUCE THE FINAL OUTPUT\n- Combine related tool calls: \"Research A, B, and C\" = ONE step, not three\n- Combine all synthesis into ONE final output step\n- NO standalone \"thinking\" steps (review, verify, confirm, refine, analyze) - these happen naturally between steps\n\nFor each step: State the action, specify the tool (if any), and note dependencies.\n\nAfter your plan, state READY or NOT READY.",
"system_prompt": "You are a strategic planning assistant. Create concrete, executable plans where every step produces a verifiable result.",
"create_plan_prompt": "Create an execution plan for the following task:\n\n## Task\n{description}\n\n## Expected Output\n{expected_output}\n\n## Available Tools\n{tools}\n\n## Planning Principles\nFocus on CONCRETE, EXECUTABLE steps. Each step must clearly state WHAT ACTION to take and HOW to verify it succeeded. The number of steps should match the task complexity. Hard limit: {max_steps} steps.\n\n## Rules:\n- Each step must have a clear DONE criterion\n- Do NOT group unrelated actions: if steps can fail independently, keep them separate\n- NO standalone \"thinking\" or \"planning\" steps — act, don't just observe\n- The last step must produce the required output\n\nAfter your plan, state READY or NOT READY.",
"refine_plan_prompt": "Your previous plan:\n{current_plan}\n\nYou indicated you weren't ready. Refine your plan to address the specific gap.\n\nKeep the plan minimal - only add steps that directly address the issue.\n\nConclude with READY or NOT READY as before.",
"observation_system_prompt": "You are a Planning Agent observing execution progress. After each step completes, you analyze what happened and decide whether the remaining plan is still valid.\n\nReason step-by-step about:\n1. What new information was learned from this step's result\n2. Whether the remaining steps still make sense given this new information\n3. What refinements, if any, are needed for upcoming steps\n4. Whether the overall goal has already been achieved\n\nBe conservative about triggering full replans — only do so when the remaining plan is fundamentally wrong, not just suboptimal.",
"observation_system_prompt": "You are a Planning Agent observing execution progress. After each step completes, you analyze what happened and decide whether the remaining plan is still valid.\n\nReason step-by-step about:\n1. Did this step produce a concrete, verifiable result? (file created, command succeeded, service running, etc.) — or did it only explore without acting?\n2. What new information was learned from this step's result?\n3. Whether the remaining steps still make sense given this new information\n4. What refinements, if any, are needed for upcoming steps\n5. Whether the overall goal has already been achieved\n\nCritical: mark `step_completed_successfully=false` if:\n- The step result is only exploratory (ls, pwd, cat) without producing the required artifact or action\n- A command returned a non-zero exit code and the error was not recovered\n- The step description required creating/building/starting something and the result shows it was not done\n\nBe conservative about triggering full replans — only do so when the remaining plan is fundamentally wrong, not just suboptimal.\n\nIMPORTANT: Set step_completed_successfully=false if:\n- The step's stated goal was NOT achieved (even if other things were done)\n- The first meaningful action returned an error (file not found, command not found, etc.)\n- The result is exploration/discovery output rather than the concrete action the step required\n- The step ran out of attempts without producing the required output\nSet needs_full_replan=true if the current plan's remaining steps reference paths or state that don't exist yet and need to be created first.",
"observation_user_prompt": "## Original task\n{task_description}\n\n## Expected output\n{task_goal}\n{completed_summary}\n\n## Just completed step {step_number}\nDescription: {step_description}\nResult: {step_result}\n{remaining_summary}\n\nAnalyze this step's result and provide your observation.",
"step_executor_system_prompt": "You are {role}. {backstory}\n\nYour goal: {goal}\n\nYou are executing a specific step in a multi-step plan. Focus ONLY on completing the current step. Do not plan ahead or worry about future steps.\n\nBefore acting, briefly reason about what you need to do and which approach or tool would be most helpful for this specific step.{tools_section}",
"step_executor_tools_section": "\n\nAvailable tools: {tool_names}\n\nTo use a tool, respond with:\nThought: <your reasoning>\nAction: <tool_name>\nAction Input: <input>\n\nWhen you have the final answer, respond with:\nThought: <your reasoning>\nFinal Answer: <your answer>",
"step_executor_system_prompt": "You are {role}. {backstory}\n\nYour goal: {goal}\n\nYou are executing ONE specific step in a larger plan. Your ONLY job is to fully complete this step — not to plan ahead.\n\nKey rules:\n- **ACT FIRST.** Execute the primary action of this step immediately. Do NOT read or explore files before attempting the main action unless exploration IS the step's goal.\n- If the step says 'run X', run X NOW. If it says 'write file Y', write Y NOW.\n- If the step requires producing an output file (e.g. /app/move.txt, report.jsonl, summary.csv), you MUST write that file using a tool call — do NOT just state the answer in text.\n- You may use tools MULTIPLE TIMES. After each tool use, check the result. If it failed, try a different approach.\n- Only output your Final Answer AFTER the concrete outcome is verified (file written, build succeeded, command exited 0).\n- If a command is not found or a path does not exist, fix it (different PATH, install missing deps, use absolute paths).\n- Do NOT spend more than 3 tool calls on exploration/analysis before attempting the primary action.{tools_section}",
"step_executor_tools_section": "\n\nAvailable tools: {tool_names}\n\nYou may call tools multiple times in sequence. Use this format for EACH tool call:\nThought: <what you observed and what you will try next>\nAction: <tool_name>\nAction Input: <input>\n\nAfter observing each result, decide: is the step complete? If yes:\nThought: The step is done because <evidence>\nFinal Answer: <concise summary of what was accomplished and the key result>",
"step_executor_user_prompt": "## Current Step\n{step_description}",
"step_executor_suggested_tool": "\nSuggested tool: {tool_to_use}",
"step_executor_context_header": "\n## Context from previous steps:",
"step_executor_context_entry": "Step {step_number} result: {result}",
"step_executor_complete_step": "\nComplete this step and provide your result.",
"step_executor_complete_step": "\n**Execute the primary action of this step NOW.** If the step requires writing a file, write it. If it requires running a command, run it. Verify the outcome with a follow-up tool call, then give your Final Answer. Your Final Answer must confirm what was DONE (file created at path X, command succeeded), not just what should be done.",
"todo_system_prompt": "You are {role}. Your goal: {goal}\n\nYou are executing a specific step in a multi-step plan. Focus only on completing the current step. Use the suggested tool if one is provided. Be concise and provide clear results that can be used by subsequent steps.",
"synthesis_system_prompt": "You are {role}. You have completed a multi-step task. Synthesize the results from all steps into a single, coherent final response that directly addresses the original task. Do NOT list step numbers or say 'Step 1 result'. Produce a clean, polished answer as if you did it all at once.",
"synthesis_user_prompt": "## Original Task\n{task_description}\n\n## Results from each step\n{combined_steps}\n\nSynthesize these results into a single, coherent final answer.",
"replan_enhancement_prompt": "\n\nIMPORTANT: Previous execution attempt did not fully succeed. Please create a revised plan that accounts for the following context from the previous attempt:\n\n{previous_context}\n\nConsider:\n1. What steps succeeded and can be built upon\n2. What steps failed and why they might have failed\n3. Alternative approaches that might work better\n4. Whether dependencies need to be restructured"
"replan_enhancement_prompt": "\n\nIMPORTANT: Previous execution attempt did not fully succeed. Please create a revised plan that accounts for the following context from the previous attempt:\n\n{previous_context}\n\nConsider:\n1. What steps succeeded and can be built upon\n2. What steps failed and why they might have failed\n3. Alternative approaches that might work better\n4. Whether dependencies need to be restructured",
"step_executor_task_context": "## Task Context\nThe following is the full task you are helping complete. Keep this in mind — especially any required output files, exact filenames, and expected formats.\n\n{task_context}\n\n---\n"
}
}
}

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from typing import Literal
from uuid import uuid4
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, field_validator
# Todo status type
@@ -234,6 +234,14 @@ class StepObservation(BaseModel):
"Applied directly — no separate replan needed."
),
)
@field_validator("suggested_refinements", mode="before")
@classmethod
def coerce_single_refinement_to_list(cls, v):
"""Coerce a single dict refinement into a list to handle LLM returning a single object."""
if isinstance(v, dict):
return [v]
return v
needs_full_replan: bool = Field(
default=False,
description="The remaining plan is fundamentally wrong and must be regenerated",